Implement State Store for OCI ObjectStorage service (#1401)
* Implement State Store for OCI ObjectStorage service Signed-off-by: lucasjellema <lucasjellema@gmail.com> * add go.mod and go.sum with dependencies on OCI SDK Signed-off-by: lucasjellema <lucasjellema@gmail.com> * Removed dependency in unit test on OCI account Signed-off-by: lucasjellema <lucasjellema@gmail.com> * Adding extensive unit test (coverage) using mock-OCI client Signed-off-by: lucasjellema <lucasjellema@gmail.com> * Correcting linting issues and review requests Signed-off-by: lucasjellema <lucasjellema@gmail.com> Co-authored-by: Looong Dai <long.dai@intel.com>
This commit is contained in:
parent
c7adb917f3
commit
a013b58d6c
3
go.mod
3
go.mod
|
@ -243,6 +243,7 @@ require (
|
|||
github.com/nats-io/nkeys v0.3.0 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/opentracing/opentracing-go v1.2.0 // indirect
|
||||
github.com/oracle/oci-go-sdk/v54 v54.0.0
|
||||
github.com/pierrec/lz4 v2.0.5+incompatible // indirect
|
||||
github.com/pkg/browser v0.0.0-20180916011732-0a3d74bf9ce4 // indirect
|
||||
github.com/pmezard/go-difflib v1.0.0 // indirect
|
||||
|
@ -301,4 +302,6 @@ require (
|
|||
stathat.com/c/consistent v1.0.0 // indirect
|
||||
)
|
||||
|
||||
require github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b // indirect
|
||||
|
||||
replace k8s.io/client => github.com/kubernetes-client/go v0.0.0-20190928040339-c757968c4c36
|
||||
|
|
4
go.sum
4
go.sum
|
@ -985,6 +985,8 @@ github.com/openzipkin-contrib/zipkin-go-opentracing v0.4.5/go.mod h1:/wsWhb9smxS
|
|||
github.com/openzipkin/zipkin-go v0.1.6/go.mod h1:QgAqvLzwWbR/WpD4A3cGpPtJrZXNIiJc5AZX7/PBEpw=
|
||||
github.com/openzipkin/zipkin-go v0.2.1/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
|
||||
github.com/openzipkin/zipkin-go v0.2.2/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4=
|
||||
github.com/oracle/oci-go-sdk/v54 v54.0.0 h1:CDLjeSejv2aDpElAJrhKpi6zvT/zhZCZuXchUUZ+LS4=
|
||||
github.com/oracle/oci-go-sdk/v54 v54.0.0/go.mod h1:+t+yvcFGVp+3ZnztnyxqXfQDsMlq8U25faBLa+mqCMc=
|
||||
github.com/pact-foundation/pact-go v1.0.4/go.mod h1:uExwJY4kCzNPcHRj+hCR/HBbOOIwwtUjcrb0b5/5kLM=
|
||||
github.com/pascaldekloe/goe v0.0.0-20180627143212-57f6aae5913c/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
|
||||
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
|
||||
|
@ -1113,6 +1115,8 @@ github.com/smartystreets/goconvey v1.6.4 h1:fv0U8FUIMPNf1L9lnHLvLhgicrIVChEkdzIK
|
|||
github.com/smartystreets/goconvey v1.6.4/go.mod h1:syvi0/a8iFYH4r/RixwvyeAJjdLS9QV7WQ/tjFTllLA=
|
||||
github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM=
|
||||
github.com/sony/gobreaker v0.4.1/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
|
||||
github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b h1:br+bPNZsJWKicw/5rALEo67QHs5weyD5tf8WST+4sJ0=
|
||||
github.com/sony/gobreaker v0.4.2-0.20210216022020-dd874f9dd33b/go.mod h1:ZKptC7FHNvhBz7dN2LGjPVBz2sZJmc0/PkyDJOjmxWY=
|
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI=
|
||||
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
|
||||
|
|
|
@ -0,0 +1,417 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation and Dapr Contributors.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
|
||||
package objectstorage
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"strings"
|
||||
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
|
||||
"github.com/oracle/oci-go-sdk/v54/common"
|
||||
"github.com/oracle/oci-go-sdk/v54/objectstorage"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
const (
|
||||
keyDelimiter = "||"
|
||||
tenancyKey = "tenancyOCID"
|
||||
compartmentKey = "compartmentOCID"
|
||||
regionKey = "region"
|
||||
fingerPrintKey = "fingerPrint"
|
||||
privateKeyKey = "privateKey"
|
||||
userKey = "userOCID"
|
||||
bucketNameKey = "bucketName"
|
||||
)
|
||||
|
||||
type StateStore struct {
|
||||
state.DefaultBulkStore
|
||||
|
||||
json jsoniter.API
|
||||
features []state.Feature
|
||||
logger logger.Logger
|
||||
client objectStoreClient
|
||||
}
|
||||
|
||||
type Metadata struct {
|
||||
userOCID string
|
||||
bucketName string
|
||||
region string
|
||||
tenancyOCID string
|
||||
fingerPrint string
|
||||
privateKey string
|
||||
compartmentOCID string
|
||||
namespace string
|
||||
OCIObjectStorageClient *objectstorage.ObjectStorageClient
|
||||
}
|
||||
|
||||
type objectStoreClient interface {
|
||||
getObject(ctx context.Context, objectname string, logger logger.Logger) ([]byte, *string, error)
|
||||
deleteObject(ctx context.Context, objectname string, etag *string) (err error)
|
||||
putObject(ctx context.Context, objectname string, contentLen int64, content io.ReadCloser, metadata map[string]string, etag *string, logger logger.Logger) error
|
||||
initStorageBucket(logger logger.Logger) error
|
||||
initOCIObjectStorageClient(logger logger.Logger) (*objectstorage.ObjectStorageClient, error)
|
||||
pingBucket(logger logger.Logger) error
|
||||
}
|
||||
|
||||
type objectStorageClient struct {
|
||||
objectStorageMetadata *Metadata
|
||||
}
|
||||
|
||||
type ociObjectStorageClient struct {
|
||||
objectStorageClient
|
||||
}
|
||||
|
||||
/********* Interface Implementations Init, Features, Get, Set, Delete and the instantiation function NewOCIObjectStorageStore. */
|
||||
|
||||
func (r *StateStore) Init(metadata state.Metadata) error {
|
||||
r.logger.Debugf("Init OCI Object Storage State Store")
|
||||
meta, err := getObjectStorageMetadata(metadata.Properties)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
r.client = &ociObjectStorageClient{objectStorageClient{objectStorageMetadata: meta}}
|
||||
|
||||
objectStorageClient, cerr := r.client.initOCIObjectStorageClient(r.logger)
|
||||
if cerr != nil {
|
||||
return fmt.Errorf("failed to initialize client or create bucket : %w", cerr)
|
||||
}
|
||||
meta.OCIObjectStorageClient = objectStorageClient
|
||||
|
||||
cerr = r.client.initStorageBucket(r.logger)
|
||||
if cerr != nil {
|
||||
return fmt.Errorf("failed to create bucket : %w", cerr)
|
||||
}
|
||||
r.logger.Debugf("OCI Object Storage State Store initialized using bucket '%s'", meta.bucketName)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StateStore) Features() []state.Feature {
|
||||
return r.features
|
||||
}
|
||||
|
||||
func (r *StateStore) Delete(req *state.DeleteRequest) error {
|
||||
r.logger.Debugf("Delete entry from OCI Object Storage State Store with key ", req.Key)
|
||||
err := r.deleteDocument(req)
|
||||
return err
|
||||
}
|
||||
|
||||
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
||||
r.logger.Debugf("Get from OCI Object Storage State Store with key ", req.Key)
|
||||
content, etag, err := r.readDocument((req))
|
||||
if err != nil {
|
||||
r.logger.Debugf("error %s", err)
|
||||
if err.Error() == "ObjectNotFound" {
|
||||
return &state.GetResponse{Data: nil, ETag: nil, Metadata: nil}, nil
|
||||
}
|
||||
|
||||
return &state.GetResponse{Data: nil, ETag: nil, Metadata: nil}, err
|
||||
}
|
||||
return &state.GetResponse{
|
||||
Data: content,
|
||||
ETag: etag,
|
||||
Metadata: nil,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (r *StateStore) Set(req *state.SetRequest) error {
|
||||
r.logger.Debugf("saving %s to OCI Object Storage State Store", req.Key)
|
||||
return r.writeDocument(req)
|
||||
}
|
||||
|
||||
func (r *StateStore) Ping() error {
|
||||
return r.pingBucket()
|
||||
}
|
||||
|
||||
func NewOCIObjectStorageStore(logger logger.Logger) *StateStore {
|
||||
s := &StateStore{
|
||||
json: jsoniter.ConfigFastest,
|
||||
features: []state.Feature{state.FeatureETag},
|
||||
logger: logger,
|
||||
client: nil,
|
||||
}
|
||||
s.DefaultBulkStore = state.NewDefaultBulkStore(s)
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
/************** private helper functions. */
|
||||
|
||||
func getObjectStorageMetadata(metadata map[string]string) (*Metadata, error) {
|
||||
meta := Metadata{}
|
||||
var err error
|
||||
if meta.bucketName, err = getValue(metadata, bucketNameKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.region, err = getValue(metadata, regionKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.compartmentOCID, err = getValue(metadata, compartmentKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.userOCID, err = getValue(metadata, userKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.fingerPrint, err = getValue(metadata, fingerPrintKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.privateKey, err = getValue(metadata, privateKeyKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if meta.tenancyOCID, err = getValue(metadata, tenancyKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &meta, nil
|
||||
}
|
||||
|
||||
func getValue(metadata map[string]string, key string) (string, error) {
|
||||
if val, ok := metadata[key]; ok && val != "" {
|
||||
return val, nil
|
||||
}
|
||||
return "", fmt.Errorf("missing or empty %s field from metadata", key)
|
||||
}
|
||||
|
||||
// functions that bridge from the Dapr State API to the OCI ObjectStorage Client.
|
||||
func (r *StateStore) writeDocument(req *state.SetRequest) error {
|
||||
if len(req.Key) == 0 || req.Key == "" {
|
||||
return fmt.Errorf("key for value to set was missing from request")
|
||||
}
|
||||
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || len(*req.ETag) == 0) {
|
||||
r.logger.Debugf("when FirstWrite is to be enforced, a value must be provided for the ETag")
|
||||
return fmt.Errorf("when FirstWrite is to be enforced, a value must be provided for the ETag")
|
||||
}
|
||||
|
||||
r.logger.Debugf("Save state in OCI Object Storage Bucket under key %s ", req.Key)
|
||||
objectName := getFileName(req.Key)
|
||||
content := r.marshal(req)
|
||||
objectLength := int64(len(content))
|
||||
ctx := context.Background()
|
||||
etag := req.ETag
|
||||
if req.Options.Concurrency != state.FirstWrite {
|
||||
etag = nil
|
||||
}
|
||||
err := r.client.putObject(ctx, objectName, objectLength, ioutil.NopCloser(bytes.NewReader(content)), nil, etag, r.logger)
|
||||
if err != nil {
|
||||
r.logger.Debugf("error in writing object to OCI object storage %s, err %s", req.Key, err)
|
||||
return fmt.Errorf("failed to write object to OCI Object storage : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StateStore) readDocument(req *state.GetRequest) ([]byte, *string, error) {
|
||||
if len(req.Key) == 0 || req.Key == "" {
|
||||
return nil, nil, fmt.Errorf("key for value to get was missing from request")
|
||||
}
|
||||
objectName := getFileName(req.Key)
|
||||
ctx := context.Background()
|
||||
content, etag, err := r.client.getObject(ctx, objectName, r.logger)
|
||||
if err != nil {
|
||||
r.logger.Debugf("download file %s, err %s", req.Key, err)
|
||||
return nil, nil, fmt.Errorf("failed to read object from OCI Object storage : %w", err)
|
||||
}
|
||||
return content, etag, nil
|
||||
}
|
||||
|
||||
func (r *StateStore) pingBucket() error {
|
||||
err := r.client.pingBucket(r.logger)
|
||||
if err != nil {
|
||||
r.logger.Debugf("ping bucket failed err %s", err)
|
||||
return fmt.Errorf("failed to ping bucket on OCI Object storage : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StateStore) deleteDocument(req *state.DeleteRequest) error {
|
||||
if len(req.Key) == 0 || req.Key == "" {
|
||||
return fmt.Errorf("key for value to delete was missing from request")
|
||||
}
|
||||
|
||||
objectName := getFileName(req.Key)
|
||||
ctx := context.Background()
|
||||
etag := req.ETag
|
||||
if req.Options.Concurrency != state.FirstWrite {
|
||||
etag = nil
|
||||
}
|
||||
if req.Options.Concurrency == state.FirstWrite && (etag == nil || len(*etag) == 0) {
|
||||
r.logger.Debugf("when FirstWrite is to be enforced, a value must be provided for the ETag")
|
||||
return fmt.Errorf("when FirstWrite is to be enforced, a value must be provided for the ETag")
|
||||
}
|
||||
err := r.client.deleteObject(ctx, objectName, etag)
|
||||
if err != nil {
|
||||
r.logger.Debugf("error in deleting object from OCI object storage %s, err %s", req.Key, err)
|
||||
return fmt.Errorf("failed to delete object from OCI Object storage : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *StateStore) marshal(req *state.SetRequest) []byte {
|
||||
var v string
|
||||
b, ok := req.Value.([]byte)
|
||||
if ok {
|
||||
v = string(b)
|
||||
} else {
|
||||
v, _ = jsoniter.MarshalToString(req.Value)
|
||||
}
|
||||
return []byte(v)
|
||||
}
|
||||
|
||||
func getFileName(key string) string {
|
||||
pr := strings.Split(key, keyDelimiter)
|
||||
if len(pr) != 2 {
|
||||
return pr[0]
|
||||
}
|
||||
|
||||
return pr[1]
|
||||
}
|
||||
|
||||
/**************** functions with OCI ObjectStorage Service interaction. */
|
||||
|
||||
func getNamespace(ctx context.Context, client objectstorage.ObjectStorageClient) (string, error) {
|
||||
request := objectstorage.GetNamespaceRequest{}
|
||||
response, err := client.GetNamespace(ctx, request)
|
||||
if err != nil {
|
||||
return *response.Value, fmt.Errorf("failed to retrieve tenancy namespace : %w", err)
|
||||
}
|
||||
|
||||
return *response.Value, nil
|
||||
}
|
||||
|
||||
// bucketname needs to be unique within compartment. there is no concept of "child" buckets.
|
||||
// the value returned is the bucket's OCID.
|
||||
func ensureBucketExists(ctx context.Context, client objectstorage.ObjectStorageClient, namespace string, name string, compartmentOCID string, logger logger.Logger) error {
|
||||
req := objectstorage.GetBucketRequest{
|
||||
NamespaceName: &namespace,
|
||||
BucketName: &name,
|
||||
}
|
||||
// verify if bucket exists.
|
||||
response, err := client.GetBucket(ctx, req)
|
||||
if err != nil {
|
||||
if response.RawResponse.StatusCode == 404 {
|
||||
err = createBucket(ctx, client, namespace, name, compartmentOCID)
|
||||
if err == nil {
|
||||
logger.Debugf("Created OCI Object Storage Bucket %s as State Store", name)
|
||||
}
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// bucketname needs to be unique within compartment. there is no concept of "child" buckets.
|
||||
func createBucket(ctx context.Context, client objectstorage.ObjectStorageClient, namespace string, name string, compartmentOCID string) error {
|
||||
request := objectstorage.CreateBucketRequest{
|
||||
NamespaceName: &namespace,
|
||||
}
|
||||
request.CompartmentId = &compartmentOCID
|
||||
request.Name = &name
|
||||
request.Metadata = make(map[string]string)
|
||||
request.PublicAccessType = objectstorage.CreateBucketDetailsPublicAccessTypeNopublicaccess
|
||||
_, err := client.CreateBucket(ctx, request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create bucket on OCI : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ***** the functions that interact with OCI Object Storage AND constitute the objectStoreClient interface.
|
||||
|
||||
func (c *ociObjectStorageClient) getObject(ctx context.Context, objectname string, logger logger.Logger) ([]byte, *string, error) {
|
||||
logger.Debugf("read file %s from OCI ObjectStorage StateStore %s ", objectname, &c.objectStorageMetadata.bucketName)
|
||||
request := objectstorage.GetObjectRequest{
|
||||
NamespaceName: &c.objectStorageMetadata.namespace,
|
||||
BucketName: &c.objectStorageMetadata.bucketName,
|
||||
ObjectName: &objectname,
|
||||
}
|
||||
response, err := c.objectStorageMetadata.OCIObjectStorageClient.GetObject(ctx, request)
|
||||
if err != nil {
|
||||
logger.Debugf("Issue in OCI ObjectStorage with retrieving object %s, error: %s", objectname, err)
|
||||
if response.RawResponse.StatusCode == 404 {
|
||||
return nil, nil, nil
|
||||
}
|
||||
return nil, nil, fmt.Errorf("failed to retrieve object : %w", err)
|
||||
}
|
||||
if response.ETag != nil {
|
||||
logger.Debugf("OCI ObjectStorage StateStore metadata: ETag %s", *response.ETag)
|
||||
}
|
||||
buf := new(bytes.Buffer)
|
||||
buf.ReadFrom(response.Content)
|
||||
return buf.Bytes(), response.ETag, nil
|
||||
}
|
||||
|
||||
func (c *ociObjectStorageClient) deleteObject(ctx context.Context, objectname string, etag *string) (err error) {
|
||||
request := objectstorage.DeleteObjectRequest{
|
||||
NamespaceName: &c.objectStorageMetadata.namespace,
|
||||
BucketName: &c.objectStorageMetadata.bucketName,
|
||||
ObjectName: &objectname,
|
||||
IfMatch: etag,
|
||||
}
|
||||
_, err = c.objectStorageMetadata.OCIObjectStorageClient.DeleteObject(ctx, request)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to delete object from OCI : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ociObjectStorageClient) putObject(ctx context.Context, objectname string, contentLen int64, content io.ReadCloser, metadata map[string]string, etag *string, logger logger.Logger) error {
|
||||
request := objectstorage.PutObjectRequest{
|
||||
NamespaceName: &c.objectStorageMetadata.namespace,
|
||||
BucketName: &c.objectStorageMetadata.bucketName,
|
||||
ObjectName: &objectname,
|
||||
ContentLength: &contentLen,
|
||||
PutObjectBody: content,
|
||||
OpcMeta: metadata,
|
||||
IfMatch: etag,
|
||||
}
|
||||
_, err := c.objectStorageMetadata.OCIObjectStorageClient.PutObject(ctx, request)
|
||||
logger.Debugf("Put object ", objectname, " in bucket ", &c.objectStorageMetadata.bucketName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to put object on OCI : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ociObjectStorageClient) initStorageBucket(logger logger.Logger) error {
|
||||
ctx := context.Background()
|
||||
err := ensureBucketExists(ctx, *c.objectStorageMetadata.OCIObjectStorageClient, c.objectStorageMetadata.namespace, c.objectStorageMetadata.bucketName, c.objectStorageMetadata.compartmentOCID, logger)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read or create bucket : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *ociObjectStorageClient) initOCIObjectStorageClient(logger logger.Logger) (*objectstorage.ObjectStorageClient, error) {
|
||||
configurationProvider := common.NewRawConfigurationProvider(c.objectStorageMetadata.tenancyOCID, c.objectStorageMetadata.userOCID, c.objectStorageMetadata.region, c.objectStorageMetadata.fingerPrint, c.objectStorageMetadata.privateKey, nil)
|
||||
objectStorageClient, cerr := objectstorage.NewObjectStorageClientWithConfigurationProvider(configurationProvider)
|
||||
if cerr != nil {
|
||||
return nil, fmt.Errorf("failed to create ObjectStorageClient : %w", cerr)
|
||||
}
|
||||
ctx := context.Background()
|
||||
c.objectStorageMetadata.namespace, cerr = getNamespace(ctx, objectStorageClient)
|
||||
if cerr != nil {
|
||||
return nil, fmt.Errorf("failed to get namespace : %w", cerr)
|
||||
}
|
||||
return &objectStorageClient, nil
|
||||
}
|
||||
|
||||
func (c *ociObjectStorageClient) pingBucket(logger logger.Logger) error {
|
||||
req := objectstorage.GetBucketRequest{
|
||||
NamespaceName: &c.objectStorageMetadata.namespace,
|
||||
BucketName: &c.objectStorageMetadata.bucketName,
|
||||
}
|
||||
_, err := c.objectStorageMetadata.OCIObjectStorageClient.GetBucket(context.Background(), req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to retrieve bucket details : %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
|
@ -0,0 +1,236 @@
|
|||
// ------------------------------------------------------------
|
||||
// Copyright (c) Microsoft Corporation and Dapr Contributors.
|
||||
// Licensed under the MIT License.
|
||||
// ------------------------------------------------------------
|
||||
package objectstorage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/dapr/components-contrib/state"
|
||||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
func getDummyOCIObjectStorageConfiguration() map[string]string {
|
||||
return map[string]string{
|
||||
"bucketName": "myBuck",
|
||||
"tenancyOCID": "ocid1.tenancy.oc1..aaaaaaaag7c7sq",
|
||||
"userOCID": "ocid1.user.oc1..aaaaaaaaby",
|
||||
"compartmentOCID": "ocid1.compartment.oc1..aaaaaaaq",
|
||||
"fingerPrint": "02:91:6c",
|
||||
"privateKey": "-----BEGIN RSA PRIVATE KEY-----\nMIIEogI=\n-----END RSA PRIVATE KEY-----",
|
||||
"region": "us-ashburn-1",
|
||||
}
|
||||
}
|
||||
|
||||
func TestInit(t *testing.T) {
|
||||
meta := state.Metadata{}
|
||||
statestore := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
t.Run("Init with complete yet incorrect metadata", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Error(t, err, "Incorrect configuration data should result in failure to create client")
|
||||
assert.Contains(t, err.Error(), "failed to initialize client", "Incorrect configuration data should result in failure to create client")
|
||||
})
|
||||
t.Run("Init with missing region", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
meta.Properties[regionKey] = ""
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, fmt.Errorf("missing or empty region field from metadata"), err, "Lacking configuration property should be spotted")
|
||||
})
|
||||
t.Run("Init with missing tenancyOCID", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
meta.Properties["tenancyOCID"] = ""
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, fmt.Errorf("missing or empty tenancyOCID field from metadata"), err, "Lacking configuration property should be spotted")
|
||||
})
|
||||
t.Run("Init with missing userOCID", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
meta.Properties[userKey] = ""
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, fmt.Errorf("missing or empty userOCID field from metadata"), err, "Lacking configuration property should be spotted")
|
||||
})
|
||||
t.Run("Init with missing compartmentOCID", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
meta.Properties[compartmentKey] = ""
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, fmt.Errorf("missing or empty compartmentOCID field from metadata"), err, "Lacking configuration property should be spotted")
|
||||
})
|
||||
t.Run("Init with missing fingerprint", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
meta.Properties[fingerPrintKey] = ""
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, fmt.Errorf("missing or empty fingerPrint field from metadata"), err, "Lacking configuration property should be spotted")
|
||||
})
|
||||
t.Run("Init with missing private key", func(t *testing.T) {
|
||||
meta.Properties = getDummyOCIObjectStorageConfiguration()
|
||||
meta.Properties[privateKeyKey] = ""
|
||||
err := statestore.Init(meta)
|
||||
assert.NotNil(t, err)
|
||||
assert.Equal(t, fmt.Errorf("missing or empty privateKey field from metadata"), err, "Lacking configuration property should be spotted")
|
||||
})
|
||||
}
|
||||
|
||||
func TestFeatures(t *testing.T) {
|
||||
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
t.Run("Test contents of Features", func(t *testing.T) {
|
||||
features := s.Features()
|
||||
assert.Contains(t, features, state.FeatureETag)
|
||||
})
|
||||
}
|
||||
|
||||
type mockedObjectStoreClient struct {
|
||||
objectStoreClient
|
||||
}
|
||||
|
||||
func (c *mockedObjectStoreClient) getObject(ctx context.Context, objectname string, logger logger.Logger) ([]byte, *string, error) {
|
||||
etag := "etag"
|
||||
return []byte("Hello World"), &etag, nil
|
||||
}
|
||||
|
||||
func (c *mockedObjectStoreClient) deleteObject(ctx context.Context, objectname string, etag *string) (err error) {
|
||||
if objectname == "unknownKey" {
|
||||
return fmt.Errorf("failed to delete object that does not exist - HTTP status code 404")
|
||||
}
|
||||
if etag != nil && *etag == "notTheCorrectETag" {
|
||||
return fmt.Errorf("failed to delete object because of incorrect etag-value ")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockedObjectStoreClient) putObject(ctx context.Context, objectname string, contentLen int64, content io.ReadCloser, metadata map[string]string, etag *string, logger logger.Logger) error {
|
||||
if etag != nil && *etag == "notTheCorrectETag" {
|
||||
return fmt.Errorf("failed to delete object because of incorrect etag-value ")
|
||||
}
|
||||
if etag != nil && *etag == "correctETag" {
|
||||
return nil
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockedObjectStoreClient) initStorageBucket(logger logger.Logger) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *mockedObjectStoreClient) pingBucket(logger logger.Logger) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestGetWithMockClient(t *testing.T) {
|
||||
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
s.client = &mockedObjectStoreClient{}
|
||||
|
||||
t.Run("Test contents of Features", func(t *testing.T) {
|
||||
getResponse, err := s.Get(&state.GetRequest{Key: "test-key"})
|
||||
assert.Equal(t, "Hello World", string(getResponse.Data), "Value retrieved should be equal to value set")
|
||||
assert.NotNil(t, *getResponse.ETag, "ETag should be set")
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestInitWithMockClient(t *testing.T) {
|
||||
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
s.client = &mockedObjectStoreClient{}
|
||||
meta := state.Metadata{}
|
||||
t.Run("Test Init with incomplete configuration", func(t *testing.T) {
|
||||
err := s.Init(meta)
|
||||
assert.NotNil(t, err, "Init should complain about lacking configuration settings")
|
||||
})
|
||||
}
|
||||
|
||||
func TestPingWithMockClient(t *testing.T) {
|
||||
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
s.client = &mockedObjectStoreClient{}
|
||||
|
||||
t.Run("Test Ping", func(t *testing.T) {
|
||||
err := s.Ping()
|
||||
assert.Nil(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSetWithMockClient(t *testing.T) {
|
||||
statestore := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
statestore.client = &mockedObjectStoreClient{}
|
||||
t.Run("Set without a key", func(t *testing.T) {
|
||||
err := statestore.Set(&state.SetRequest{Value: []byte("test-value")})
|
||||
assert.Equal(t, err, fmt.Errorf("key for value to set was missing from request"), "Lacking Key results in error")
|
||||
})
|
||||
t.Run("Regular Set Operation", func(t *testing.T) {
|
||||
testKey := "test-key"
|
||||
err := statestore.Set(&state.SetRequest{Key: testKey, Value: []byte("test-value")})
|
||||
assert.Nil(t, err, "Setting a value with a proper key should be errorfree")
|
||||
})
|
||||
t.Run("Testing Set & Concurrency (ETags)", func(t *testing.T) {
|
||||
testKey := "etag-test-key"
|
||||
incorrectETag := "notTheCorrectETag"
|
||||
etag := "correctETag"
|
||||
|
||||
err := statestore.Set(&state.SetRequest{Key: testKey, Value: []byte("overwritten-value"), ETag: &incorrectETag, Options: state.SetStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.NotNil(t, err, "Updating value with wrong etag should fail")
|
||||
|
||||
err = statestore.Set(&state.SetRequest{Key: testKey, Value: []byte("overwritten-value"), ETag: nil, Options: state.SetStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.NotNil(t, err, "Asking for FirstWrite concurrency policy without ETag should fail")
|
||||
|
||||
err = statestore.Set(&state.SetRequest{Key: testKey, Value: []byte("overwritten-value"), ETag: &etag, Options: state.SetStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.Nil(t, err, "Updating value with proper etag should go fine")
|
||||
|
||||
err = statestore.Set(&state.SetRequest{Key: testKey, Value: []byte("overwritten-value"), ETag: nil, Options: state.SetStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.NotNil(t, err, "Updating value with concurrency policy at FirstWrite should fail when ETag is missing")
|
||||
})
|
||||
}
|
||||
|
||||
func TestDeleteWithMockClient(t *testing.T) {
|
||||
s := NewOCIObjectStorageStore(logger.NewLogger("logger"))
|
||||
s.client = &mockedObjectStoreClient{}
|
||||
t.Run("Delete without a key", func(t *testing.T) {
|
||||
err := s.Delete(&state.DeleteRequest{})
|
||||
assert.Equal(t, err, fmt.Errorf("key for value to delete was missing from request"), "Lacking Key results in error")
|
||||
})
|
||||
t.Run("Delete with an unknown key", func(t *testing.T) {
|
||||
err := s.Delete(&state.DeleteRequest{Key: "unknownKey"})
|
||||
assert.Contains(t, err.Error(), "404", "Unknown Key results in error: http status code 404, object not found")
|
||||
})
|
||||
t.Run("Regular Delete Operation", func(t *testing.T) {
|
||||
testKey := "test-key"
|
||||
err := s.Delete(&state.DeleteRequest{Key: testKey})
|
||||
assert.Nil(t, err, "Deleting an existing value with a proper key should be errorfree")
|
||||
})
|
||||
t.Run("Testing Delete & Concurrency (ETags)", func(t *testing.T) {
|
||||
testKey := "etag-test-delete-key"
|
||||
incorrectETag := "notTheCorrectETag"
|
||||
err := s.Delete(&state.DeleteRequest{Key: testKey, ETag: &incorrectETag, Options: state.DeleteStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.NotNil(t, err, "Deleting value with an incorrect etag should be prevented")
|
||||
|
||||
etag := "correctETag"
|
||||
err = s.Delete(&state.DeleteRequest{Key: testKey, ETag: &etag, Options: state.DeleteStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.Nil(t, err, "Deleting value with proper etag should go fine")
|
||||
|
||||
err = s.Delete(&state.DeleteRequest{Key: testKey, ETag: nil, Options: state.DeleteStateOption{
|
||||
Concurrency: state.FirstWrite,
|
||||
}})
|
||||
assert.NotNil(t, err, "Asking for FirstWrite concurrency policy without ETag should fail")
|
||||
})
|
||||
}
|
Loading…
Reference in New Issue