Adds retry on CosmosDB Init in case of TooManyRequests error (#1329)

* Adds retry on CosmosDB Init in case of TooManyRequests error

* Use backoff v4

* missed some permanent errors

* clean up go.mod

* fix error type casting

* Add constant for HTTP 429
This commit is contained in:
Bernd Verst 2021-11-29 12:05:43 -08:00 committed by GitHub
parent 53c6ed7636
commit 175b09ea40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 160 additions and 78 deletions

View File

@ -9,8 +9,10 @@ import (
"encoding/json"
"fmt"
"strings"
"time"
"github.com/a8m/documentdb"
"github.com/cenkalti/backoff/v4"
"github.com/dapr/components-contrib/authentication/azure"
@ -36,6 +38,8 @@ type cosmosDBCredentials struct {
PartitionKey string `json:"partitionKey"`
}
const statusTooManyRequests = "429" // RFC 6585, 4
// NewCosmosDB returns a new CosmosDB instance.
func NewCosmosDB(logger logger.Logger) *CosmosDB {
return &CosmosDB{logger: logger}
@ -73,36 +77,56 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error {
// this allows us to provide the most flexibility in the request document sent to this binding
config.IdentificationHydrator = nil
config.WithAppIdentifier("dapr-" + logger.DaprVersion)
client := documentdb.New(m.URL, config)
dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
// Retries initializing the client if a TooManyRequests error is encountered
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = 5 * time.Minute
err = backoff.RetryNotify(func() (err error) {
client := documentdb.New(m.URL, config)
dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})
if err != nil {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
} else if len(dbs) == 0 {
return backoff.Permanent(fmt.Errorf("database %s for CosmosDB binding not found", m.Database))
}
c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
} else if len(colls) == 0 {
return backoff.Permanent(fmt.Errorf("collection %s for CosmosDB binding not found", m.Collection))
}
c.collection = &colls[0]
c.client = client
return nil
}, bo, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d)
})
if err != nil {
return err
} else if len(dbs) == 0 {
return fmt.Errorf("database %s for CosmosDB state store not found", m.Database)
}
c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
return err
} else if len(colls) == 0 {
return fmt.Errorf("collection %s for CosmosDB state store not found", m.Collection)
}
c.collection = &colls[0]
c.client = client
return nil
}
@ -154,7 +178,7 @@ func (c *CosmosDB) Invoke(req *bindings.InvokeRequest) (*bindings.InvokeResponse
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {
val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
if err != nil {
return nil, fmt.Errorf("missing partitionKey field %s from request body - %s", c.partitionKey, err)
return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
}
if val == "" {
@ -189,3 +213,17 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{
return c.lookup(m, ks[1:])
}
func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}
if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}
return false
}

View File

@ -12,9 +12,11 @@ import (
"fmt"
"strconv"
"strings"
"time"
"github.com/a8m/documentdb"
"github.com/agrea/ptr"
"github.com/cenkalti/backoff/v4"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
@ -63,10 +65,11 @@ type storedProcedureDefinition struct {
}
const (
storedProcedureName = "__dapr__"
metadataPartitionKey = "partitionKey"
unknownPartitionKey = "__UNKNOWN__"
metadataTTLKey = "ttlInSeconds"
storedProcedureName = "__dapr__"
metadataPartitionKey = "partitionKey"
unknownPartitionKey = "__UNKNOWN__"
metadataTTLKey = "ttlInSeconds"
statusTooManyRequests = "429" // RFC 6585, 4
)
// NewCosmosDBStateStore returns a new CosmosDB state store.
@ -131,62 +134,89 @@ func (c *StateStore) Init(meta state.Metadata) error {
config = documentdb.NewConfigWithServicePrincipal(spt)
}
config.WithAppIdentifier("dapr-" + logger.DaprVersion)
client := documentdb.New(m.URL, config)
dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})
if err != nil {
return err
} else if len(dbs) == 0 {
return fmt.Errorf("database %s for CosmosDB state store not found", m.Database)
}
// Retries initializing the client if a TooManyRequests error is encountered
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = 5 * time.Minute
err = backoff.RetryNotify(func() (err error) {
client := documentdb.New(m.URL, config)
c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
return err
} else if len(colls) == 0 {
return fmt.Errorf("collection %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection)
}
dbs, err := client.QueryDatabases(&documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Database},
},
})
c.metadata = m
c.collection = &colls[0]
c.client = client
c.contentType = m.ContentType
sps, err := c.client.ReadStoredProcedures(c.collection.Self)
if err != nil {
return err
}
// get a link to the sp
for i := range sps {
if sps[i].Id == storedProcedureName {
c.sp = &sps[i]
break
}
}
if c.sp == nil {
// register the stored procedure
createspBody := storedProcedureDefinition{ID: storedProcedureName, Body: spDefinition}
c.sp, err = c.client.CreateStoredProcedure(c.collection.Self, createspBody)
if err != nil {
// if it already exists that is success
if !strings.HasPrefix(err.Error(), "Conflict") {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
} else if len(dbs) == 0 {
return backoff.Permanent(fmt.Errorf("database %s for CosmosDB state store not found", m.Database))
}
c.db = &dbs[0]
colls, err := client.QueryCollections(c.db.Self, &documentdb.Query{
Query: "SELECT * FROM ROOT r WHERE r.id=@id",
Parameters: []documentdb.Parameter{
{Name: "@id", Value: m.Collection},
},
})
if err != nil {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
} else if len(colls) == 0 {
return backoff.Permanent(fmt.Errorf("collection %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection))
}
c.metadata = m
c.collection = &colls[0]
c.client = client
c.contentType = m.ContentType
sps, err := c.client.ReadStoredProcedures(c.collection.Self)
if err != nil {
if isTooManyRequestsError(err) {
return err
}
return backoff.Permanent(err)
}
// get a link to the sp
for i := range sps {
if sps[i].Id == storedProcedureName {
c.sp = &sps[i]
break
}
}
if c.sp == nil {
// register the stored procedure
createspBody := storedProcedureDefinition{ID: storedProcedureName, Body: spDefinition}
c.sp, err = c.client.CreateStoredProcedure(c.collection.Self, createspBody)
if err != nil {
if isTooManyRequestsError(err) {
return err
}
// if it already exists that is success
if !strings.HasPrefix(err.Error(), "Conflict") {
return backoff.Permanent(err)
}
}
}
return nil
}, bo, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store initialization failed: %v; retrying in %s", err, d)
})
if err != nil {
return err
}
c.logger.Debug("cosmos Init done")
@ -482,3 +512,17 @@ func parseTTL(requestMetadata map[string]string) (*int, error) {
return nil, nil
}
func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}
if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}
return false
}