diff --git a/bindings/azure/cosmosdb/cosmosdb.go b/bindings/azure/cosmosdb/cosmosdb.go index ceec3f58b..f3bd4c432 100644 --- a/bindings/azure/cosmosdb/cosmosdb.go +++ b/bindings/azure/cosmosdb/cosmosdb.go @@ -31,8 +31,7 @@ import ( // CosmosDB allows performing state operations on collections. type CosmosDB struct { client *documentdb.DocumentDB - collection *documentdb.Collection - db *documentdb.Database + collection string partitionKey string logger logger.Logger @@ -86,51 +85,29 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error { config.IdentificationHydrator = nil config.WithAppIdentifier("dapr-" + logger.DaprVersion) + c.client = documentdb.New(m.URL, config) + // Retries initializing the client if a TooManyRequests error is encountered - bo := backoff.NewExponentialBackOff() - bo.InitialInterval = 2 * time.Second - bo.MaxElapsedTime = 5 * time.Minute - err = backoff.RetryNotify(func() (err error) { - client := documentdb.New(m.URL, config) - - dbs, err := client.QueryDatabases(&documentdb.Query{ - Query: "SELECT * FROM ROOT r WHERE r.id=@id", - Parameters: []documentdb.Parameter{ - {Name: "@id", Value: m.Database}, - }, - }) + err = retryOperation(func() (err error) { + collLink := fmt.Sprintf("dbs/%s/colls/%s/", m.Database, m.Collection) + coll, err := c.client.ReadCollection(collLink) if err != nil { if isTooManyRequestsError(err) { return err } return backoff.Permanent(err) - } else if len(dbs) == 0 { - return backoff.Permanent(fmt.Errorf("database %s for CosmosDB binding not found", m.Database)) + } else if coll == nil || coll.Self == "" { + return backoff.Permanent( + fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection, m.Database), + ) } - c.db = &dbs[0] - colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{ - Query: "SELECT * FROM ROOT r WHERE r.id=@id", - Parameters: []documentdb.Parameter{ - {Name: "@id", Value: m.Collection}, - }, - }) - if err != nil { - if isTooManyRequestsError(err) { - return err - } - return backoff.Permanent(err) - } else if len(colls) == 0 { - return backoff.Permanent(fmt.Errorf("collection %s for CosmosDB binding not found", m.Collection)) - } - - c.collection = &colls[0] - c.client = client + c.collection = coll.Self return nil - }, bo, func(err error, d time.Duration) { + }, func(err error, d time.Duration) { c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d) - }) + }, 5*time.Minute) if err != nil { return err } @@ -172,7 +149,18 @@ func (c *CosmosDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse return nil, err } - _, err = c.client.CreateDocument(c.collection.Self, obj, documentdb.PartitionKey(val)) + err = retryOperation(func() error { + _, innerErr := c.client.CreateDocument(c.collection, obj, documentdb.PartitionKey(val)) + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB binding Invoke request failed: %v; retrying in %s", err, d) + }, 20*time.Second) if err != nil { return nil, err } @@ -222,6 +210,13 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{ return c.lookup(m, ks[1:]) } +func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error { + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 2 * time.Second + bo.MaxElapsedTime = maxElapsedTime + return backoff.RetryNotify(operation, bo, notify) +} + func isTooManyRequestsError(err error) bool { if err == nil { return false diff --git a/state/azure/cosmosdb/cosmosdb.go b/state/azure/cosmosdb/cosmosdb.go index ef2aa6a75..d66693197 100644 --- a/state/azure/cosmosdb/cosmosdb.go +++ b/state/azure/cosmosdb/cosmosdb.go @@ -160,10 +160,7 @@ func (c *StateStore) Init(meta state.Metadata) error { c.contentType = m.ContentType // Retries initializing the client if a TooManyRequests error is encountered - bo := backoff.NewExponentialBackOff() - bo.InitialInterval = 2 * time.Second - bo.MaxElapsedTime = 5 * time.Minute - err = backoff.RetryNotify(func() (innerErr error) { + err = retryOperation(func() (innerErr error) { _, innerErr = c.findCollection() if innerErr != nil { if isTooManyRequestsError(innerErr) { @@ -175,26 +172,20 @@ func (c *StateStore) Init(meta state.Metadata) error { // if we're authenticating using Azure AD, we can't perform CRUD operations on stored procedures, so we need to try invoking the version SP and see if we get the desired version only if m.MasterKey == "" { innerErr = c.checkStoredProcedures() - if innerErr != nil { - if isTooManyRequestsError(innerErr) { - return innerErr - } - return backoff.Permanent(innerErr) - } } else { innerErr = c.ensureStoredProcedures() - if innerErr != nil { - if isTooManyRequestsError(innerErr) { - return innerErr - } - return backoff.Permanent(innerErr) + } + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr } + return backoff.Permanent(innerErr) } return nil - }, bo, func(err error, d time.Duration) { + }, func(err error, d time.Duration) { c.logger.Warnf("CosmosDB state store initialization failed: %v; retrying in %s", err, d) - }) + }, 5*time.Minute) if err != nil { return err } @@ -223,12 +214,23 @@ func (c *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) { options = append(options, documentdb.ConsistencyLevel(documentdb.Eventual)) } - _, err := c.client.QueryDocuments( - c.getCollectionLink(), - documentdb.NewQuery("SELECT * FROM ROOT r WHERE r.id=@id", documentdb.P{Name: "@id", Value: key}), - &items, - options..., - ) + err := retryOperation(func() error { + _, innerErr := c.client.QueryDocuments( + c.getCollectionLink(), + documentdb.NewQuery("SELECT * FROM ROOT r WHERE r.id=@id", documentdb.P{Name: "@id", Value: key}), + &items, + options..., + ) + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB state store Get request failed: %v; retrying in %s", err, d) + }, 20*time.Second) if err != nil { return nil, err } else if len(items) == 0 { @@ -283,7 +285,18 @@ func (c *StateStore) Set(req *state.SetRequest) error { if err != nil { return err } - _, err = c.client.UpsertDocument(c.getCollectionLink(), &doc, options...) + err = retryOperation(func() error { + _, innerErr := c.client.UpsertDocument(c.getCollectionLink(), &doc, options...) + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB state store Set request failed: %v; retrying in %s", err, d) + }, 20*time.Second) if err != nil { if req.ETag != nil { @@ -329,12 +342,21 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error { options = append(options, documentdb.ConsistencyLevel(documentdb.Eventual)) } - _, err = c.client.DeleteDocument(items[0].Self, options...) - if err != nil { - c.logger.Debugf("Error from cosmos.DeleteDocument e=%e, e.Error=%s", err, err.Error()) - } + err = retryOperation(func() error { + _, innerErr := c.client.DeleteDocument(items[0].Self, options...) + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB state store Delete request failed: %v; retrying in %s", err, d) + }, 20*time.Second) if err != nil { + c.logger.Debugf("Error from cosmos.DeleteDocument e=%e, e.Error=%s", err, err.Error()) if req.ETag != nil { return state.NewETagError(state.ETagMismatch, err) } @@ -386,15 +408,25 @@ func (c *StateStore) Multi(request *state.TransactionalStateRequest) error { var retString string // The stored procedure throws if it failed, which sets err to non-nil. It doesn't return anything else. - err := c.client.ExecuteStoredProcedure( - c.getSprocLink(storedProcedureName), - [...]interface{}{operations}, - &retString, - documentdb.PartitionKey(partitionKey), - ) + err := retryOperation(func() error { + innerErr := c.client.ExecuteStoredProcedure( + c.getSprocLink(storedProcedureName), + [...]interface{}{operations}, + &retString, + documentdb.PartitionKey(partitionKey), + ) + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB state store Multi request failed: %v; retrying in %s", err, d) + }, 20*time.Second) if err != nil { c.logger.Debugf("error=%e", err) - return err } @@ -407,7 +439,21 @@ func (c *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error if err := qbuilder.BuildQuery(&req.Query); err != nil { return &state.QueryResponse{}, err } - data, token, err := q.execute(c.client, c.getCollectionLink()) + var data []state.QueryItem + var token string + err := retryOperation(func() error { + var innerErr error + data, token, innerErr = q.execute(c.client, c.getCollectionLink()) + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB state store Ping request failed: %v; retrying in %s", err, d) + }, 20*time.Second) if err != nil { return &state.QueryResponse{}, err } @@ -419,8 +465,18 @@ func (c *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error } func (c *StateStore) Ping() error { - _, err := c.findCollection() - return err + return retryOperation(func() error { + _, innerErr := c.findCollection() + if innerErr != nil { + if isTooManyRequestsError(innerErr) { + return innerErr + } + return backoff.Permanent(innerErr) + } + return nil + }, func(err error, d time.Duration) { + c.logger.Warnf("CosmosDB state store Ping request failed: %v; retrying in %s", err, d) + }, 20*time.Second) } // getCollectionLink returns the link to the collection. @@ -512,7 +568,7 @@ func (c *StateStore) findCollection() (*documentdb.Collection, error) { if err != nil { return nil, err } - if coll == nil || coll.Id == "" { + if coll == nil || coll.Self == "" { return nil, fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", c.metadata.Collection, c.metadata.Database) } return coll, nil @@ -585,6 +641,13 @@ func parseTTL(requestMetadata map[string]string) (*int, error) { return nil, nil } +func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error { + bo := backoff.NewExponentialBackOff() + bo.InitialInterval = 2 * time.Second + bo.MaxElapsedTime = maxElapsedTime + return backoff.RetryNotify(operation, bo, notify) +} + func isTooManyRequestsError(err error) bool { if err == nil { return false