diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index 22fa89a71..3b3abad0d 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -20,8 +20,7 @@ import ( "strings" "time" - "github.com/a8m/documentdb" - backoff "github.com/cenkalti/backoff/v4" + "github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos" "github.com/dapr/components-contrib/bindings" "github.com/dapr/components-contrib/internal/authentication/azure" @@ -30,8 +29,7 @@ import ( // CosmosDB allows performing state operations on collections. type CosmosDB struct { - client *documentdb.DocumentDB - collection string + client *azcosmos.ContainerClient partitionKey string logger logger.Logger @@ -45,7 +43,8 @@ type cosmosDBCredentials struct { PartitionKey string `json:"partitionKey"` } -const statusTooManyRequests = "429" // RFC 6585, 4 +// Value used for timeout durations +const timeoutValue = 30 // NewCosmosDB returns a new CosmosDB instance. func NewCosmosDB(logger logger.Logger) *CosmosDB { @@ -62,57 +61,43 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error { c.partitionKey = m.PartitionKey // Create the client; first, try authenticating with a master key, if present - var config *documentdb.Config + var client *azcosmos.Client if m.MasterKey != "" { - config = documentdb.NewConfig(&documentdb.Key{ - Key: m.MasterKey, - }) + cred, keyErr := azcosmos.NewKeyCredential(m.MasterKey) + if keyErr != nil { + return keyErr + } + client, err = azcosmos.NewClientWithKey(m.URL, cred, nil) + if err != nil { + return err + } } else { // Fallback to using Azure AD - env, errB := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties) - if errB != nil { - return errB + env, errEnv := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties) + if errEnv != nil { + return errEnv } - spt, errB := env.GetServicePrincipalToken() - if errB != nil { - return errB + token, errToken := env.GetTokenCredential() + if errToken != nil { + return errToken } - config = documentdb.NewConfigWithServicePrincipal(spt) - } - // disable the identification hydrator (which autogenerates IDs if missing from the request) - // so we aren't forced to use a struct by the upstream SDK - // this allows us to provide the most flexibility in the request document sent to this binding - config.IdentificationHydrator = nil - config.WithAppIdentifier("dapr-" + logger.DaprVersion) - - c.client = documentdb.New(m.URL, config) - - // Retries initializing the client if a TooManyRequests error is encountered - err = retryOperation(func() (err error) { - collLink := fmt.Sprintf("dbs/%s/colls/%s/", m.Database, m.Collection) - coll, err := c.client.ReadCollection(collLink) + client, err = azcosmos.NewClient(m.URL, token, nil) if err != nil { - if isTooManyRequestsError(err) { - return err - } - return backoff.Permanent(err) - } else if coll == nil || coll.Self == "" { - return backoff.Permanent( - fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection, m.Database), - ) + return err } + } - c.collection = coll.Self - - return nil - }, func(err error, d time.Duration) { - c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d) - }, 5*time.Minute) + // Create a container client + dbContainer, err := client.NewContainer(m.Database, m.Collection) if err != nil { return err } - return nil + c.client = dbContainer + ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second) + _, err = c.client.Read(ctx, nil) + cancel() + return err } func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentials, error) { @@ -135,7 +120,7 @@ func (c *CosmosDB) Operations() []bindings.OperationKind { return []bindings.OperationKind{bindings.CreateOperation} } -func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (c *CosmosDB) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case bindings.CreateOperation: var obj interface{} @@ -144,41 +129,34 @@ func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bind return nil, err } - val, err := c.getPartitionKeyValue(c.partitionKey, obj) + pkString, err := c.getPartitionKeyValue(c.partitionKey, obj) if err != nil { return nil, err } + pk := azcosmos.NewPartitionKeyString(pkString) - err = retryOperation(func() error { - _, innerErr := c.client.CreateDocument(c.collection, obj, documentdb.PartitionKey(val)) - if innerErr != nil { - if isTooManyRequestsError(innerErr) { - return innerErr - } - return backoff.Permanent(innerErr) - } - return nil - }, func(err error, d time.Duration) { - c.logger.Warnf("CosmosDB binding Invoke request failed: %v; retrying in %s", err, d) - }, 20*time.Second) + _, err = c.client.CreateItem(ctx, pk, req.Data, nil) if err != nil { return nil, err } - return nil, nil default: return nil, fmt.Errorf("operation kind %s not supported", req.Operation) } } -func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) { - val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, ".")) +func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (string, error) { + valI, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, ".")) if err != nil { - return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err) + return "", fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err) + } + val, ok := valI.(string) + if !ok { + return "", fmt.Errorf("partition key is not a string") } if val == "" { - return nil, fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey) + return "", fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey) } return val, nil @@ -209,24 +187,3 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{ return c.lookup(m, ks[1:]) } - -func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error { - bo := backoff.NewExponentialBackOff() - bo.InitialInterval = 2 * time.Second - bo.MaxElapsedTime = maxElapsedTime - return backoff.RetryNotify(operation, bo, notify) -} - -func isTooManyRequestsError(err error) bool { - if err == nil { - return false - } - - if requestError, ok := err.(*documentdb.RequestError); ok { - if requestError.Code == statusTooManyRequests { - return true - } - } - - return false -} diff --git a/go.mod b/go.mod index 8693956ac..c0a931679 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/Azure/azure-sdk-for-go v65.0.0+incompatible github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 + github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1 github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1 diff --git a/go.sum b/go.sum index 18adbeee2..236773348 100644 --- a/go.sum +++ b/go.sum @@ -112,6 +112,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0 h1:h/72OERa/5hgnKEOyQJ8gtJoTVX3uwHCavsraGadTZM= github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss= github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= diff --git a/tests/certification/bindings/azure/cosmosdb/go.mod b/tests/certification/bindings/azure/cosmosdb/go.mod index b362ab2ed..6d6d59bef 100644 --- a/tests/certification/bindings/azure/cosmosdb/go.mod +++ b/tests/certification/bindings/azure/cosmosdb/go.mod @@ -19,8 +19,10 @@ require ( github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect github.com/Azure/azure-pipeline-go v0.2.3 // indirect + github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect + github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 // indirect github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect github.com/Azure/azure-storage-blob-go v0.10.0 // indirect github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect diff --git a/tests/certification/bindings/azure/cosmosdb/go.sum b/tests/certification/bindings/azure/cosmosdb/go.sum index 9b2f7f881..09592a1f5 100644 --- a/tests/certification/bindings/azure/cosmosdb/go.sum +++ b/tests/certification/bindings/azure/cosmosdb/go.sum @@ -49,10 +49,14 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc= github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U= github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw= +github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg= +github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY= github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w= github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=