Adding in track2 SDK support for CosmosDB Bindings (#1876)
* Adding in track2 SDK support for CosmosDB Bindings Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com> * Removing non needed ID check Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com> * Addressing container read operation Signed-off-by: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> * Updating linting Signed-off-by: Ryan Lettieri <67934986+RyanLettieri@users.noreply.github.com> Co-authored-by: Ryan Lettieri <ryanLettieri@microsoft.com> Co-authored-by: Dapr Bot <56698301+dapr-bot@users.noreply.github.com>
This commit is contained in:
parent
697afe8608
commit
ac1e16f58f
|
@ -20,8 +20,7 @@ import (
|
|||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/a8m/documentdb"
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
|
||||
|
||||
"github.com/dapr/components-contrib/bindings"
|
||||
"github.com/dapr/components-contrib/internal/authentication/azure"
|
||||
|
@ -30,8 +29,7 @@ import (
|
|||
|
||||
// CosmosDB allows performing state operations on collections.
|
||||
type CosmosDB struct {
|
||||
client *documentdb.DocumentDB
|
||||
collection string
|
||||
client *azcosmos.ContainerClient
|
||||
partitionKey string
|
||||
|
||||
logger logger.Logger
|
||||
|
@ -45,7 +43,8 @@ type cosmosDBCredentials struct {
|
|||
PartitionKey string `json:"partitionKey"`
|
||||
}
|
||||
|
||||
const statusTooManyRequests = "429" // RFC 6585, 4
|
||||
// Value used for timeout durations
|
||||
const timeoutValue = 30
|
||||
|
||||
// NewCosmosDB returns a new CosmosDB instance.
|
||||
func NewCosmosDB(logger logger.Logger) *CosmosDB {
|
||||
|
@ -62,57 +61,43 @@ func (c *CosmosDB) Init(metadata bindings.Metadata) error {
|
|||
c.partitionKey = m.PartitionKey
|
||||
|
||||
// Create the client; first, try authenticating with a master key, if present
|
||||
var config *documentdb.Config
|
||||
var client *azcosmos.Client
|
||||
if m.MasterKey != "" {
|
||||
config = documentdb.NewConfig(&documentdb.Key{
|
||||
Key: m.MasterKey,
|
||||
})
|
||||
cred, keyErr := azcosmos.NewKeyCredential(m.MasterKey)
|
||||
if keyErr != nil {
|
||||
return keyErr
|
||||
}
|
||||
client, err = azcosmos.NewClientWithKey(m.URL, cred, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
// Fallback to using Azure AD
|
||||
env, errB := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties)
|
||||
if errB != nil {
|
||||
return errB
|
||||
env, errEnv := azure.NewEnvironmentSettings("cosmosdb", metadata.Properties)
|
||||
if errEnv != nil {
|
||||
return errEnv
|
||||
}
|
||||
spt, errB := env.GetServicePrincipalToken()
|
||||
if errB != nil {
|
||||
return errB
|
||||
token, errToken := env.GetTokenCredential()
|
||||
if errToken != nil {
|
||||
return errToken
|
||||
}
|
||||
config = documentdb.NewConfigWithServicePrincipal(spt)
|
||||
}
|
||||
// disable the identification hydrator (which autogenerates IDs if missing from the request)
|
||||
// so we aren't forced to use a struct by the upstream SDK
|
||||
// this allows us to provide the most flexibility in the request document sent to this binding
|
||||
config.IdentificationHydrator = nil
|
||||
config.WithAppIdentifier("dapr-" + logger.DaprVersion)
|
||||
|
||||
c.client = documentdb.New(m.URL, config)
|
||||
|
||||
// Retries initializing the client if a TooManyRequests error is encountered
|
||||
err = retryOperation(func() (err error) {
|
||||
collLink := fmt.Sprintf("dbs/%s/colls/%s/", m.Database, m.Collection)
|
||||
coll, err := c.client.ReadCollection(collLink)
|
||||
client, err = azcosmos.NewClient(m.URL, token, nil)
|
||||
if err != nil {
|
||||
if isTooManyRequestsError(err) {
|
||||
return err
|
||||
}
|
||||
return backoff.Permanent(err)
|
||||
} else if coll == nil || coll.Self == "" {
|
||||
return backoff.Permanent(
|
||||
fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", m.Collection, m.Database),
|
||||
)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
c.collection = coll.Self
|
||||
|
||||
return nil
|
||||
}, func(err error, d time.Duration) {
|
||||
c.logger.Warnf("CosmosDB binding initialization failed: %v; retrying in %s", err, d)
|
||||
}, 5*time.Minute)
|
||||
// Create a container client
|
||||
dbContainer, err := client.NewContainer(m.Database, m.Collection)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
c.client = dbContainer
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeoutValue*time.Second)
|
||||
_, err = c.client.Read(ctx, nil)
|
||||
cancel()
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *CosmosDB) parseMetadata(metadata bindings.Metadata) (*cosmosDBCredentials, error) {
|
||||
|
@ -135,7 +120,7 @@ func (c *CosmosDB) Operations() []bindings.OperationKind {
|
|||
return []bindings.OperationKind{bindings.CreateOperation}
|
||||
}
|
||||
|
||||
func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
func (c *CosmosDB) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
switch req.Operation {
|
||||
case bindings.CreateOperation:
|
||||
var obj interface{}
|
||||
|
@ -144,41 +129,34 @@ func (c *CosmosDB) Invoke(_ context.Context, req *bindings.InvokeRequest) (*bind
|
|||
return nil, err
|
||||
}
|
||||
|
||||
val, err := c.getPartitionKeyValue(c.partitionKey, obj)
|
||||
pkString, err := c.getPartitionKeyValue(c.partitionKey, obj)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
pk := azcosmos.NewPartitionKeyString(pkString)
|
||||
|
||||
err = retryOperation(func() error {
|
||||
_, innerErr := c.client.CreateDocument(c.collection, obj, documentdb.PartitionKey(val))
|
||||
if innerErr != nil {
|
||||
if isTooManyRequestsError(innerErr) {
|
||||
return innerErr
|
||||
}
|
||||
return backoff.Permanent(innerErr)
|
||||
}
|
||||
return nil
|
||||
}, func(err error, d time.Duration) {
|
||||
c.logger.Warnf("CosmosDB binding Invoke request failed: %v; retrying in %s", err, d)
|
||||
}, 20*time.Second)
|
||||
_, err = c.client.CreateItem(ctx, pk, req.Data, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return nil, nil
|
||||
default:
|
||||
return nil, fmt.Errorf("operation kind %s not supported", req.Operation)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (interface{}, error) {
|
||||
val, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
|
||||
func (c *CosmosDB) getPartitionKeyValue(key string, obj interface{}) (string, error) {
|
||||
valI, err := c.lookup(obj.(map[string]interface{}), strings.Split(key, "."))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
|
||||
return "", fmt.Errorf("missing partitionKey field %s from request body - %w", c.partitionKey, err)
|
||||
}
|
||||
val, ok := valI.(string)
|
||||
if !ok {
|
||||
return "", fmt.Errorf("partition key is not a string")
|
||||
}
|
||||
|
||||
if val == "" {
|
||||
return nil, fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
|
||||
return "", fmt.Errorf("partitionKey field %s from request body is empty", c.partitionKey)
|
||||
}
|
||||
|
||||
return val, nil
|
||||
|
@ -209,24 +187,3 @@ func (c *CosmosDB) lookup(m map[string]interface{}, ks []string) (val interface{
|
|||
|
||||
return c.lookup(m, ks[1:])
|
||||
}
|
||||
|
||||
func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error {
|
||||
bo := backoff.NewExponentialBackOff()
|
||||
bo.InitialInterval = 2 * time.Second
|
||||
bo.MaxElapsedTime = maxElapsedTime
|
||||
return backoff.RetryNotify(operation, bo, notify)
|
||||
}
|
||||
|
||||
func isTooManyRequestsError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if requestError, ok := err.(*documentdb.RequestError); ok {
|
||||
if requestError.Code == statusTooManyRequests {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
1
go.mod
1
go.mod
|
@ -12,6 +12,7 @@ require (
|
|||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1
|
||||
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1
|
||||
|
|
2
go.sum
2
go.sum
|
@ -112,6 +112,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath
|
|||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0 h1:h/72OERa/5hgnKEOyQJ8gtJoTVX3uwHCavsraGadTZM=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.0/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
|
|
|
@ -19,8 +19,10 @@ require (
|
|||
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
|
||||
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 // indirect
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
|
||||
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
|
||||
|
|
|
@ -49,10 +49,14 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a
|
|||
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
|
||||
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
|
||||
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
|
||||
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
|
||||
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
|
||||
|
|
Loading…
Reference in New Issue