wasm: support http(s) fetch of Wasm files (#3005)
Signed-off-by: Edoardo Vacchi <evacchi@users.noreply.github.com> Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
9ca07d9bca
commit
13dfbdbac6
|
|
@ -63,7 +63,7 @@ func NewWasmOutput(logger logger.Logger) bindings.OutputBinding {
|
|||
}
|
||||
|
||||
func (out *outputBinding) Init(ctx context.Context, metadata bindings.Metadata) (err error) {
|
||||
if out.meta, err = wasm.GetInitMetadata(metadata.Base); err != nil {
|
||||
if out.meta, err = wasm.GetInitMetadata(ctx, metadata.Base); err != nil {
|
||||
return fmt.Errorf("wasm: failed to parse metadata: %w", err)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,64 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieout.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wasm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// httpClient decorates an http.Client with convenience methods.
|
||||
type httpClient struct {
|
||||
c http.Client
|
||||
}
|
||||
|
||||
// newHTTPFetcher is a constructor for httpFetcher.
|
||||
//
|
||||
// It is possible to plug a custom http.RoundTripper to handle other concerns (e.g. retries)
|
||||
// Compression is handled transparently and automatically by http.Client.
|
||||
func newHTTPCLient(transport http.RoundTripper) *httpClient {
|
||||
return &httpClient{
|
||||
c: http.Client{Transport: transport},
|
||||
}
|
||||
}
|
||||
|
||||
// fetch returns a byte slice of the wasm module found at the given URL, or an error otherwise.
|
||||
func (f *httpClient) get(ctx context.Context, u *url.URL) ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
resp, err := f.c.Do(req.WithContext(ctx))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
io.Copy(io.Discard, resp.Body)
|
||||
resp.Body.Close()
|
||||
return nil, fmt.Errorf("received %v status code from %q", resp.StatusCode, u)
|
||||
}
|
||||
|
||||
bytes, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return bytes, nil
|
||||
}
|
||||
|
|
@ -0,0 +1,86 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieout.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wasm
|
||||
|
||||
import (
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"net/url"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
var wasmMagicNumber = []byte{0x00, 0x61, 0x73, 0x6d}
|
||||
|
||||
func TestWasmHTTPFetch(t *testing.T) {
|
||||
wasmBinary := wasmMagicNumber
|
||||
wasmBinary = append(wasmBinary, 0x00, 0x00, 0x00, 0x00)
|
||||
cases := []struct {
|
||||
name string
|
||||
handler http.HandlerFunc
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
name: "plain wasm binary",
|
||||
handler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Write(wasmBinary)
|
||||
},
|
||||
},
|
||||
// Compressed payloads are handled automatically by http.Client.
|
||||
{
|
||||
name: "compressed payload",
|
||||
handler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Encoding", "gzip")
|
||||
|
||||
gw := gzip.NewWriter(w)
|
||||
defer gw.Close()
|
||||
gw.Write(wasmBinary)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "http error",
|
||||
handler: func(w http.ResponseWriter, r *http.Request) {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
},
|
||||
expectedError: "received 500 status code",
|
||||
},
|
||||
}
|
||||
|
||||
for _, proto := range []string{"http", "https"} {
|
||||
t.Run(proto, func(t *testing.T) {
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ts := httptest.NewServer(tc.handler)
|
||||
defer ts.Close()
|
||||
c := newHTTPCLient(http.DefaultTransport)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
parse, err := url.Parse(ts.URL)
|
||||
require.NoError(t, err)
|
||||
_, err = c.get(ctx, parse)
|
||||
if tc.expectedError != "" {
|
||||
require.ErrorContains(t, err, tc.expectedError)
|
||||
return
|
||||
}
|
||||
require.NoError(t, err, "Wasm download got an unexpected error: %v", err)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -1,9 +1,26 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieout.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wasm
|
||||
|
||||
import (
|
||||
"context"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"os"
|
||||
"path"
|
||||
|
|
@ -50,13 +67,12 @@ type InitMetadata struct {
|
|||
}
|
||||
|
||||
// GetInitMetadata returns InitMetadata from the input metadata.
|
||||
func GetInitMetadata(md metadata.Base) (*InitMetadata, error) {
|
||||
func GetInitMetadata(ctx context.Context, md metadata.Base) (*InitMetadata, error) {
|
||||
// Note: the ctx will be used for other schemes such as HTTP and OCI.
|
||||
|
||||
var m InitMetadata
|
||||
// Decode the metadata
|
||||
err := metadata.DecodeMetadata(md.Properties, &m)
|
||||
if err != nil {
|
||||
if err := metadata.DecodeMetadata(md.Properties, &m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
|
|
@ -74,17 +90,23 @@ func GetInitMetadata(md metadata.Base) (*InitMetadata, error) {
|
|||
case "oci":
|
||||
return nil, fmt.Errorf("TODO %s", scheme)
|
||||
case "http", "https":
|
||||
_, err = url.Parse(m.URL)
|
||||
u, err := url.Parse(m.URL)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, fmt.Errorf("TODO %s", scheme)
|
||||
c := newHTTPCLient(http.DefaultTransport)
|
||||
m.Guest, err = c.get(ctx, u)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.GuestName, _ = strings.CutSuffix(path.Base(u.Path), ".wasm")
|
||||
case "file":
|
||||
guestPath := m.URL[7:]
|
||||
m.Guest, err = os.ReadFile(guestPath)
|
||||
guest, err := os.ReadFile(guestPath)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
m.Guest = guest
|
||||
// Use the name of the wasm binary as the module name.
|
||||
m.GuestName, _ = strings.CutSuffix(path.Base(guestPath), ".wasm")
|
||||
default:
|
||||
|
|
|
|||
|
|
@ -1,3 +1,18 @@
|
|||
/*
|
||||
Copyright 2023 The Dapr Authors
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implieout.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package wasm
|
||||
|
||||
import (
|
||||
|
|
@ -23,6 +38,9 @@ const (
|
|||
var binArgs []byte
|
||||
|
||||
func TestGetInitMetadata(t *testing.T) {
|
||||
testCtx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
metadata metadata.Base
|
||||
|
|
@ -84,16 +102,16 @@ func TestGetInitMetadata(t *testing.T) {
|
|||
{
|
||||
name: "TODO http",
|
||||
metadata: metadata.Base{Properties: map[string]string{
|
||||
"url": "http://foo/bar.wasm",
|
||||
"url": "http://foo.invalid/bar.wasm",
|
||||
}},
|
||||
expectedErr: "TODO http",
|
||||
expectedErr: "no such host",
|
||||
},
|
||||
{
|
||||
name: "TODO https",
|
||||
metadata: metadata.Base{Properties: map[string]string{
|
||||
"url": "https://foo/bar.wasm",
|
||||
"url": "https://foo.invalid/bar.wasm",
|
||||
}},
|
||||
expectedErr: "TODO https",
|
||||
expectedErr: "no such host",
|
||||
},
|
||||
{
|
||||
name: "unsupported scheme",
|
||||
|
|
@ -122,7 +140,7 @@ func TestGetInitMetadata(t *testing.T) {
|
|||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
md, err := GetInitMetadata(tc.metadata)
|
||||
md, err := GetInitMetadata(testCtx, tc.metadata)
|
||||
if tc.expectedErr == "" {
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expected, md)
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ func (m *middleware) GetHandler(ctx context.Context, metadata dapr.Metadata) (fu
|
|||
|
||||
// getHandler is extracted for unit testing.
|
||||
func (m *middleware) getHandler(ctx context.Context, metadata dapr.Metadata) (*requestHandler, error) {
|
||||
meta, err := wasm.GetInitMetadata(metadata.Base)
|
||||
meta, err := wasm.GetInitMetadata(ctx, metadata.Base)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("wasm: failed to parse metadata: %w", err)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,6 +18,9 @@ import (
|
|||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
//go:embed internal/testdata/rewrite.wasm
|
||||
var exampleWasmBin []byte
|
||||
|
||||
func Test_NewMiddleWare(t *testing.T) {
|
||||
l := logger.NewLogger(t.Name())
|
||||
require.Equal(t, &middleware{logger: l}, NewMiddleware(l))
|
||||
|
|
@ -37,6 +40,11 @@ func Test_middleware_log(t *testing.T) {
|
|||
|
||||
func Test_middleware_getHandler(t *testing.T) {
|
||||
m := &middleware{logger: logger.NewLogger(t.Name())}
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
if r.URL.Path == "/example.wasm" {
|
||||
w.Write(exampleWasmBin)
|
||||
}
|
||||
}))
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
|
|
@ -61,6 +69,12 @@ func Test_middleware_getHandler(t *testing.T) {
|
|||
}},
|
||||
expectedErr: "wasm: error compiling guest: invalid magic number",
|
||||
},
|
||||
{
|
||||
name: "remote wasm url",
|
||||
metadata: metadata.Base{Properties: map[string]string{
|
||||
"url": ts.URL + "/example.wasm",
|
||||
}},
|
||||
},
|
||||
{
|
||||
name: "ok",
|
||||
metadata: metadata.Base{Properties: map[string]string{
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@ package internal_test
|
|||
|
||||
import (
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"context"
|
||||
"log"
|
||||
"net/http"
|
||||
|
|
@ -128,19 +129,65 @@ func Test_EndToEnd(t *testing.T) {
|
|||
},
|
||||
}
|
||||
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
defer buf.Reset()
|
||||
t.Run("local", func(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
defer buf.Reset()
|
||||
|
||||
wasmPath := path.Join(t.TempDir(), "guest.wasm")
|
||||
require.NoError(t, os.WriteFile(wasmPath, tc.guest, 0o600))
|
||||
wasmPath := path.Join(t.TempDir(), "guest.wasm")
|
||||
require.NoError(t, os.WriteFile(wasmPath, tc.guest, 0o600))
|
||||
|
||||
meta := metadata.Base{Properties: map[string]string{"url": "file://" + wasmPath}}
|
||||
handlerFn, err := wasm.NewMiddleware(l).GetHandler(context.Background(), middleware.Metadata{Base: meta})
|
||||
require.NoError(t, err)
|
||||
handler := handlerFn(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
|
||||
tc.test(t, handler, &buf)
|
||||
meta := metadata.Base{Properties: map[string]string{"url": "file://" + wasmPath}}
|
||||
handlerFn, err := wasm.NewMiddleware(l).GetHandler(context.Background(), middleware.Metadata{Base: meta})
|
||||
require.NoError(t, err)
|
||||
handler := handlerFn(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
|
||||
tc.test(t, handler, &buf)
|
||||
})
|
||||
}
|
||||
})
|
||||
|
||||
httptests := []struct {
|
||||
name string
|
||||
handler func(w http.ResponseWriter, r *http.Request, guest []byte)
|
||||
}{
|
||||
{
|
||||
name: "http",
|
||||
handler: func(w http.ResponseWriter, r *http.Request, guest []byte) {
|
||||
w.Write(guest)
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "http+gzip",
|
||||
handler: func(w http.ResponseWriter, r *http.Request, guest []byte) {
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.Header().Set("Content-Encoding", "gzip")
|
||||
|
||||
gw := gzip.NewWriter(w)
|
||||
defer gw.Close()
|
||||
gw.Write(guest)
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, ht := range httptests {
|
||||
t.Run(ht.name, func(t *testing.T) {
|
||||
for _, tt := range tests {
|
||||
tc := tt
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
defer buf.Reset()
|
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ht.handler(w, r, tc.guest)
|
||||
}))
|
||||
defer ts.Close()
|
||||
|
||||
meta := metadata.Base{Properties: map[string]string{"url": ts.URL + "/guest.wasm"}}
|
||||
handlerFn, err := wasm.NewMiddleware(l).GetHandler(context.Background(), middleware.Metadata{Base: meta})
|
||||
require.NoError(t, err)
|
||||
handler := handlerFn(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}))
|
||||
tc.test(t, handler, &buf)
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue