431 lines
13 KiB
Go
431 lines
13 KiB
Go
/*
|
|
Copyright 2023 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package awss3binding_test
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"strconv"
|
|
"testing"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
|
|
bindings_s3 "github.com/dapr/components-contrib/bindings/aws/s3"
|
|
secretstore_env "github.com/dapr/components-contrib/secretstores/local/env"
|
|
bindings_loader "github.com/dapr/dapr/pkg/components/bindings"
|
|
secretstores_loader "github.com/dapr/dapr/pkg/components/secretstores"
|
|
|
|
dapr_testing "github.com/dapr/dapr/pkg/testing"
|
|
daprsdk "github.com/dapr/go-sdk/client"
|
|
"github.com/dapr/kit/logger"
|
|
|
|
"github.com/dapr/components-contrib/tests/certification/embedded"
|
|
"github.com/dapr/components-contrib/tests/certification/flow"
|
|
"github.com/dapr/components-contrib/tests/certification/flow/sidecar"
|
|
|
|
"github.com/aws/aws-sdk-go/service/s3"
|
|
)
|
|
|
|
const (
|
|
sidecarName = "bindings-s3-sidecar"
|
|
bindingsMetadataName = "s3-cert-tests"
|
|
objNotFound = "object not found"
|
|
)
|
|
|
|
var bucketName = "bucketName"
|
|
|
|
func init() {
|
|
if envVal := os.Getenv("BINDINGS_AWS_S3_BUCKET"); envVal != "" {
|
|
bucketName = envVal
|
|
}
|
|
}
|
|
|
|
func TestAWSS3CertificationTests(t *testing.T) {
|
|
defer teardown(t)
|
|
|
|
t.Run("S3SBasic", func(t *testing.T) {
|
|
S3SBasic(t)
|
|
})
|
|
|
|
t.Run("S3SForcePathStyle", func(t *testing.T) {
|
|
S3SForcePathStyle(t)
|
|
})
|
|
|
|
t.Run("S3SBase64", func(t *testing.T) {
|
|
S3SBase64(t)
|
|
})
|
|
}
|
|
|
|
// createObjectRequest is used to make a common binding request for create operation.
|
|
func createObjectRequest(ctx flow.Context, client daprsdk.Client, dataBytes []byte, invokeCreateMetadata map[string]string) (*daprsdk.BindingEvent, error) {
|
|
invokeCreateRequest := &daprsdk.InvokeBindingRequest{
|
|
Name: bindingsMetadataName,
|
|
Operation: "create",
|
|
Data: dataBytes,
|
|
Metadata: invokeCreateMetadata,
|
|
}
|
|
|
|
return client.InvokeBinding(ctx, invokeCreateRequest)
|
|
}
|
|
|
|
// listObjectRequest is used to make a common binding request for the list operation.
|
|
func listObjectRequest(ctx flow.Context, client daprsdk.Client) (out *daprsdk.BindingEvent, err error) {
|
|
invokeRequest := &daprsdk.InvokeBindingRequest{
|
|
Name: bindingsMetadataName,
|
|
Operation: "list",
|
|
Data: nil,
|
|
Metadata: nil,
|
|
}
|
|
|
|
out, invokeErr := client.InvokeBinding(ctx, invokeRequest)
|
|
if invokeErr != nil {
|
|
return nil, fmt.Errorf("%w", invokeErr)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// getObjectRequest is used to make a common binding request for the get operation.
|
|
func getObjectRequest(ctx flow.Context, client daprsdk.Client, name string, isBase64 bool) (out *daprsdk.BindingEvent, err error) {
|
|
invokeGetMetadata := map[string]string{
|
|
"key": name,
|
|
"encodeBase64": fmt.Sprintf("%t", isBase64),
|
|
}
|
|
|
|
return getObjectRequestWithMetadata(ctx, client, invokeGetMetadata)
|
|
}
|
|
|
|
// getObjectRequest is used to make a common binding request for the get operation passing metadata.
|
|
func getObjectRequestWithMetadata(ctx flow.Context, client daprsdk.Client, invokeGetMetadata map[string]string) (out *daprsdk.BindingEvent, err error) {
|
|
invokeGetRequest := &daprsdk.InvokeBindingRequest{
|
|
Name: bindingsMetadataName,
|
|
Operation: "get",
|
|
Data: nil,
|
|
Metadata: invokeGetMetadata,
|
|
}
|
|
|
|
out, invokeGetErr := client.InvokeBinding(ctx, invokeGetRequest)
|
|
if invokeGetErr != nil {
|
|
return nil, fmt.Errorf("%w", invokeGetErr)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// deleteObjectRequest is used to make a common binding request for the delete operation.
|
|
func deleteObjectRequest(ctx flow.Context, client daprsdk.Client, name string) (out *daprsdk.BindingEvent, err error) {
|
|
invokeDeleteMetadata := map[string]string{
|
|
"key": name,
|
|
}
|
|
|
|
invokeGetRequest := &daprsdk.InvokeBindingRequest{
|
|
Name: bindingsMetadataName,
|
|
Operation: "delete",
|
|
Data: nil,
|
|
Metadata: invokeDeleteMetadata,
|
|
}
|
|
|
|
out, invokeDeleteErr := client.InvokeBinding(ctx, invokeGetRequest)
|
|
if invokeDeleteErr != nil {
|
|
return nil, fmt.Errorf("%w", invokeDeleteErr)
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// Verify S3 Basic Binding Support (Create, Get, List, Delete)
|
|
func S3SBasic(t *testing.T) {
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
require.NoError(t, err)
|
|
|
|
currentGRPCPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
objectName := "filename.txt"
|
|
|
|
testCreateGetListDelete := func(ctx flow.Context) error {
|
|
client, clientErr := daprsdk.NewClientWithPort(fmt.Sprint(currentGRPCPort))
|
|
if clientErr != nil {
|
|
panic(clientErr)
|
|
}
|
|
defer client.Close()
|
|
|
|
input := "some example content"
|
|
dataBytes := []byte(input)
|
|
|
|
invokeCreateMetadata := map[string]string{
|
|
"key": objectName,
|
|
}
|
|
|
|
_, invokeCreateErr := createObjectRequest(ctx, client, dataBytes, invokeCreateMetadata)
|
|
require.NoError(t, invokeCreateErr)
|
|
|
|
invokeGetMetadata := map[string]string{
|
|
"key": objectName,
|
|
}
|
|
|
|
invokeGetRequest := &daprsdk.InvokeBindingRequest{
|
|
Name: bindingsMetadataName,
|
|
Operation: "get",
|
|
Data: nil,
|
|
Metadata: invokeGetMetadata,
|
|
}
|
|
|
|
out, invokeGetErr := client.InvokeBinding(ctx, invokeGetRequest)
|
|
require.NoError(t, invokeGetErr)
|
|
assert.Equal(t, input, string(out.Data))
|
|
|
|
out, invokeErr := listObjectRequest(ctx, client)
|
|
require.NoError(t, invokeErr)
|
|
var output s3.ListObjectsOutput
|
|
unmarshalErr := json.Unmarshal(out.Data, &output)
|
|
require.NoError(t, unmarshalErr)
|
|
|
|
found := false
|
|
for _, item := range output.Contents {
|
|
if *item.Key == objectName {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
assert.True(t, found)
|
|
|
|
out, invokeDeleteErr := deleteObjectRequest(ctx, client, objectName)
|
|
require.NoError(t, invokeDeleteErr)
|
|
assert.Empty(t, out.Data)
|
|
|
|
// confirm the deletion.
|
|
_, invokeSecondGetErr := getObjectRequest(ctx, client, objectName, false)
|
|
assert.Error(t, invokeSecondGetErr)
|
|
assert.Contains(t, invokeSecondGetErr.Error(), objNotFound)
|
|
|
|
return nil
|
|
}
|
|
|
|
flow.New(t, "AWS S3 binding basic").
|
|
Step(sidecar.Run(sidecarName,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithComponentsPath("./components/basic"),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("Create/Get/List/Delete S3 Object", testCreateGetListDelete).
|
|
Run()
|
|
}
|
|
|
|
// Verify forcePathStyle
|
|
func S3SForcePathStyle(t *testing.T) {
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
require.NoError(t, err)
|
|
|
|
currentGRPCPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
objectName := "filename.txt"
|
|
locationForcePathStyleFalse := fmt.Sprintf("https://%s.s3.amazonaws.com/%s", bucketName, objectName)
|
|
locationForcePathStyleTrue := fmt.Sprintf("https://s3.amazonaws.com/%s/%s", bucketName, objectName)
|
|
|
|
testForcePathStyle := func(forcePathStyle string) func(ctx flow.Context) error {
|
|
return func(ctx flow.Context) error {
|
|
client, clientErr := daprsdk.NewClientWithPort(fmt.Sprint(currentGRPCPort))
|
|
if clientErr != nil {
|
|
panic(clientErr)
|
|
}
|
|
defer client.Close()
|
|
|
|
input := "some example content"
|
|
dataBytes := []byte(input)
|
|
|
|
invokeCreateMetadata := map[string]string{
|
|
"key": objectName,
|
|
}
|
|
|
|
cout, invokeCreateErr := createObjectRequest(ctx, client, dataBytes, invokeCreateMetadata)
|
|
require.NoError(t, invokeCreateErr)
|
|
var createResponse struct {
|
|
Location string `json:"location"`
|
|
VersionID *string `json:"versionID"`
|
|
PresignURL string `json:"presignURL,omitempty"`
|
|
}
|
|
unmarshalErr := json.Unmarshal(cout.Data, &createResponse)
|
|
require.NoError(t, unmarshalErr)
|
|
assert.Equal(t, createResponse.Location, forcePathStyle)
|
|
|
|
out, invokeDeleteErr := deleteObjectRequest(ctx, client, objectName)
|
|
require.NoError(t, invokeDeleteErr)
|
|
assert.Empty(t, out.Data)
|
|
|
|
// confirm the deletion.
|
|
_, invokeSecondGetErr := getObjectRequest(ctx, client, objectName, false)
|
|
assert.Error(t, invokeSecondGetErr)
|
|
assert.Contains(t, invokeSecondGetErr.Error(), objNotFound)
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "AWS S3 binding with forcePathStyle True").
|
|
Step(sidecar.Run(sidecarName,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithComponentsPath("./components/forcePathStyleTrue"),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("Create/Delete S3 Object forcePathStyle True", testForcePathStyle(locationForcePathStyleTrue)).
|
|
Run()
|
|
|
|
flow.New(t, "AWS S3 binding with forcePathStyleFalse").
|
|
Step(sidecar.Run(sidecarName,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithComponentsPath("./components/forcePathStyleFalse"),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("Create/Delete S3 Object forcePathStyle False", testForcePathStyle(locationForcePathStyleFalse)).
|
|
Run()
|
|
}
|
|
|
|
// Verify Base64 (Encode/Decode)
|
|
func S3SBase64(t *testing.T) {
|
|
ports, err := dapr_testing.GetFreePorts(2)
|
|
require.NoError(t, err)
|
|
|
|
currentGRPCPort := ports[0]
|
|
currentHTTPPort := ports[1]
|
|
|
|
testCreateBase64FromFile := func() func(ctx flow.Context) error {
|
|
return func(ctx flow.Context) error {
|
|
client, clientErr := daprsdk.NewClientWithPort(fmt.Sprint(currentGRPCPort))
|
|
if clientErr != nil {
|
|
panic(clientErr)
|
|
}
|
|
defer client.Close()
|
|
|
|
dataBytes := []byte(base64.StdEncoding.EncodeToString([]byte("somecontent")))
|
|
invokeCreateMetadata := map[string]string{
|
|
"decodeBase64": "true",
|
|
}
|
|
|
|
out, invokeCreateErr := createObjectRequest(ctx, client, dataBytes, invokeCreateMetadata)
|
|
require.NoError(t, invokeCreateErr)
|
|
|
|
genKey := out.Metadata["key"]
|
|
isBase64 := true
|
|
out, invokeGetErr := getObjectRequest(ctx, client, genKey, isBase64)
|
|
require.NoError(t, invokeGetErr)
|
|
assert.Equal(t, out.Data, dataBytes)
|
|
assert.Empty(t, out.Metadata)
|
|
|
|
out, invokeDeleteErr := deleteObjectRequest(ctx, client, genKey)
|
|
require.NoError(t, invokeDeleteErr)
|
|
assert.Empty(t, out.Data)
|
|
|
|
// confirm the deletion.
|
|
_, invokeSecondGetErr := getObjectRequest(ctx, client, genKey, false)
|
|
assert.Error(t, invokeSecondGetErr)
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
testCreateFromFileGetEncodeBase64 := func() func(ctx flow.Context) error {
|
|
return func(ctx flow.Context) error {
|
|
client, clientErr := daprsdk.NewClientWithPort(fmt.Sprint(currentGRPCPort))
|
|
if clientErr != nil {
|
|
panic(clientErr)
|
|
}
|
|
defer client.Close()
|
|
|
|
dataBytes := []byte("somecontent not base64 encoded")
|
|
b64EncodedDataBytes := []byte(base64.StdEncoding.EncodeToString(dataBytes))
|
|
invokeCreateMetadata := map[string]string{}
|
|
|
|
out, invokeCreateErr := createObjectRequest(ctx, client, dataBytes, invokeCreateMetadata)
|
|
require.NoError(t, invokeCreateErr)
|
|
|
|
genKey := out.Metadata["key"]
|
|
invokeGetMetadata := map[string]string{
|
|
"key": genKey,
|
|
}
|
|
out, invokeGetErr := getObjectRequestWithMetadata(ctx, client, invokeGetMetadata)
|
|
require.NoError(t, invokeGetErr)
|
|
assert.Equal(t, out.Data, b64EncodedDataBytes)
|
|
assert.Empty(t, out.Metadata)
|
|
|
|
out, invokeDeleteErr := deleteObjectRequest(ctx, client, genKey)
|
|
require.NoError(t, invokeDeleteErr)
|
|
assert.Empty(t, out.Data)
|
|
|
|
// confirm the deletion.
|
|
_, invokeSecondGetErr := getObjectRequest(ctx, client, genKey, false)
|
|
assert.Error(t, invokeSecondGetErr)
|
|
|
|
return nil
|
|
}
|
|
}
|
|
|
|
flow.New(t, "decode base64 option for binary").
|
|
Step(sidecar.Run(sidecarName,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithComponentsPath("./components/decodeBase64"),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("Create blob from file", testCreateBase64FromFile()).
|
|
Run()
|
|
|
|
flow.New(t, "upload regular file get as encode base64").
|
|
Step(sidecar.Run(sidecarName,
|
|
append(componentRuntimeOptions(),
|
|
embedded.WithoutApp(),
|
|
embedded.WithComponentsPath("./components/encodeBase64"),
|
|
embedded.WithDaprGRPCPort(strconv.Itoa(currentGRPCPort)),
|
|
embedded.WithDaprHTTPPort(strconv.Itoa(currentHTTPPort)),
|
|
)...,
|
|
)).
|
|
Step("Create blob from file get encode base64", testCreateFromFileGetEncodeBase64()).
|
|
Run()
|
|
}
|
|
|
|
func componentRuntimeOptions() []embedded.Option {
|
|
log := logger.NewLogger("dapr.components")
|
|
|
|
bindingsRegistry := bindings_loader.NewRegistry()
|
|
bindingsRegistry.Logger = log
|
|
bindingsRegistry.RegisterOutputBinding(bindings_s3.NewAWSS3, "aws.s3")
|
|
|
|
secretstoreRegistry := secretstores_loader.NewRegistry()
|
|
secretstoreRegistry.Logger = log
|
|
secretstoreRegistry.RegisterComponent(secretstore_env.NewEnvSecretStore, "local.env")
|
|
|
|
return []embedded.Option{
|
|
embedded.WithBindings(bindingsRegistry),
|
|
embedded.WithSecretStores(secretstoreRegistry),
|
|
}
|
|
}
|
|
|
|
func teardown(t *testing.T) {
|
|
t.Logf("AWS S3 Binding CertificationTests teardown...")
|
|
// Dapr runtime automatically creates the following queues, topics
|
|
// so here they get deleted.
|
|
|
|
t.Logf("AWS S3 Binding CertificationTests teardown...done!")
|
|
}
|