Merge branch 'master' into fix/configapi_getall

Signed-off-by: Pravin Pushkar <ppushkar@microsoft.com>
This commit is contained in:
Pravin Pushkar 2022-09-22 10:40:05 +05:30 committed by GitHub
commit ac2ddd8ef5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 321 additions and 557 deletions

3
go.mod
View File

@ -12,7 +12,7 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1
github.com/Azure/azure-sdk-for-go/sdk/keyvault/azsecrets v0.7.1
github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus v1.0.1
@ -26,7 +26,6 @@ require (
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/Shopify/sarama v1.30.0
github.com/StackExchange/wmi v0.0.0-20210224194228-fe8f1750fd46 // indirect
github.com/a8m/documentdb v1.3.1-0.20220405205223-5b41ba0aaeb1
github.com/aerospike/aerospike-client-go v4.5.0+incompatible
github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b
github.com/ajg/form v1.5.1 // indirect

6
go.sum
View File

@ -112,8 +112,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4Sath
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1-0.20220914204640-4d445978fe75 h1:fXg1PuTP8+xhOnxrPtteHGbreNsTRFvu+NsbLCqQrWg=
github.com/Azure/azure-sdk-for-go/sdk/data/azappconfig v0.4.1-0.20220914204640-4d445978fe75/go.mod h1:p74+tP95m8830ypJk53L93+BEsjTKY4SKQ75J2NmS5U=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1 h1:bFa9IcjvrCber6gGgDAUZ+I2bO8J7s8JxXmu9fhi2ss=
github.com/Azure/azure-sdk-for-go/sdk/data/aztables v1.0.1/go.mod h1:l3wvZkG9oW07GLBW5Cd0WwG5asOfJ8aqE8raUvNzLpk=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
@ -257,8 +257,6 @@ github.com/VividCortex/gohistogram v1.0.0/go.mod h1:Pf5mBqqDxYaXu3hDrrU+w6nw50o/
github.com/Workiva/go-datastructures v1.0.52/go.mod h1:Z+F2Rca0qCsVYDS8z7bAGm8f3UkzuWYS/oBZz5a7VVA=
github.com/Workiva/go-datastructures v1.0.53 h1:J6Y/52yX10Xc5JjXmGtWoSSxs3mZnGSaq37xZZh7Yig=
github.com/Workiva/go-datastructures v1.0.53/go.mod h1:1yZL+zfsztete+ePzZz/Zb1/t5BnDuE2Ya2MMGhzP6A=
github.com/a8m/documentdb v1.3.1-0.20220405205223-5b41ba0aaeb1 h1:vdxL7id6rXNHNAh7yHUHiTsTvFupt+c7MBa+1bru+48=
github.com/a8m/documentdb v1.3.1-0.20220405205223-5b41ba0aaeb1/go.mod h1:4Z0mpi7fkyqjxUdGiNMO3vagyiUoiwLncaIX6AsW5z0=
github.com/aead/siphash v1.0.1/go.mod h1:Nywa3cDsYNNK3gaciGTWPwHt0wlpNV15vwmswBAUSII=
github.com/aerospike/aerospike-client-go v4.5.0+incompatible h1:6ALev/Ge4jW5avSLoqgvPYTh+FLeeDD9xDhzoMCNgOo=
github.com/aerospike/aerospike-client-go v4.5.0+incompatible/go.mod h1:zj8LBEnWBDOVEIJt8LvaRvDG5ARAoa5dBeHaB472NRc=

View File

@ -14,19 +14,19 @@ limitations under the License.
package cosmosdb
import (
// For go:embed.
_ "embed"
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"net/http"
"strconv"
"strings"
"time"
"github.com/a8m/documentdb"
"github.com/agrea/ptr"
"github.com/cenkalti/backoff/v4"
"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/google/uuid"
jsoniter "github.com/json-iterator/go"
@ -35,26 +35,16 @@ import (
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
"github.com/dapr/kit/logger"
"github.com/dapr/kit/ptr"
)
// Version of the stored procedure to use.
const spVersion = 2
//go:embed storedprocedures/__dapr_v2__.js
var spDefinition string
//go:embed storedprocedures/__daprver__.js
var spVersionDefinition string
// StateStore is a CosmosDB state store.
type StateStore struct {
state.DefaultBulkStore
client *documentdb.DocumentDB
client *azcosmos.ContainerClient
metadata metadata
contentType string
features []state.Feature
logger logger.Logger
logger logger.Logger
}
type metadata struct {
@ -67,11 +57,6 @@ type metadata struct {
type cosmosOperationType string
const (
deleteOperationType cosmosOperationType = "delete"
upsertOperationType cosmosOperationType = "upsert"
)
// CosmosOperation is a wrapper around a CosmosDB operation.
type CosmosOperation struct {
Item CosmosItem `json:"item"`
@ -80,37 +65,42 @@ type CosmosOperation struct {
// CosmosItem is a wrapper around a CosmosDB document.
type CosmosItem struct {
documentdb.Document
ID string `json:"id"`
Value interface{} `json:"value"`
IsBinary bool `json:"isBinary"`
PartitionKey string `json:"partitionKey"`
TTL *int `json:"ttl,omitempty"`
}
type storedProcedureDefinition struct {
ID string `json:"id"`
Body string `json:"body"`
Etag string
}
const (
storedProcedureName = "__dapr_v2__"
versionSpName = "__daprver__"
metadataPartitionKey = "partitionKey"
unknownPartitionKey = "__UNKNOWN__"
metadataTTLKey = "ttlInSeconds"
statusTooManyRequests = "429" // RFC 6585, 4
statusNotFound = "NotFound"
metadataPartitionKey = "partitionKey"
metadataTTLKey = "ttlInSeconds"
defaultTimeout = 20 * time.Second
statusNotFound = "NotFound"
)
// policy that tracks the number of times it was invoked
type crossPartitionQueryPolicy struct{}
func (p *crossPartitionQueryPolicy) Do(req *policy.Request) (*http.Response, error) {
raw := req.Raw()
hdr := raw.Header
if strings.ToLower(hdr.Get("x-ms-documentdb-query")) == "true" {
// modify req here since we know it is a query
hdr.Add("x-ms-documentdb-query-enablecrosspartition", "true")
hdr.Del("x-ms-documentdb-partitionkey")
raw.Header = hdr
}
return req.Next()
}
// NewCosmosDBStateStore returns a new CosmosDB state store.
func NewCosmosDBStateStore(logger logger.Logger) state.Store {
s := &StateStore{
features: []state.Feature{state.FeatureETag, state.FeatureTransactional, state.FeatureQueryAPI},
logger: logger,
logger: logger,
}
s.DefaultBulkStore = state.NewDefaultBulkStore(s)
return s
}
@ -118,8 +108,7 @@ func NewCosmosDBStateStore(logger logger.Logger) state.Store {
func (c *StateStore) Init(meta state.Metadata) error {
c.logger.Debugf("CosmosDB init start")
connInfo := meta.Properties
b, err := json.Marshal(connInfo)
b, err := json.Marshal(meta.Properties)
if err != nil {
return err
}
@ -146,136 +135,130 @@ func (c *StateStore) Init(meta state.Metadata) error {
return errors.New("contentType is required")
}
// Internal query policy was created due to lack of cross partition query capability in the current Go sdk
queryPolicy := &crossPartitionQueryPolicy{}
opts := azcosmos.ClientOptions{
ClientOptions: policy.ClientOptions{
PerCallPolicies: []policy.Policy{queryPolicy},
},
}
// Create the client; first, try authenticating with a master key, if present
var config *documentdb.Config
var client *azcosmos.Client
if m.MasterKey != "" {
config = documentdb.NewConfig(&documentdb.Key{
Key: m.MasterKey,
})
var cred azcosmos.KeyCredential
cred, err = azcosmos.NewKeyCredential(m.MasterKey)
if err != nil {
return err
}
client, err = azcosmos.NewClientWithKey(m.URL, cred, &opts)
if err != nil {
return err
}
} else {
// Fallback to using Azure AD
env, errB := azure.NewEnvironmentSettings("cosmosdb", meta.Properties)
if errB != nil {
return errB
var env azure.EnvironmentSettings
env, err = azure.NewEnvironmentSettings("cosmosdb", meta.Properties)
if err != nil {
return err
}
spt, errB := env.GetServicePrincipalToken()
if errB != nil {
return errB
token, tokenErr := env.GetTokenCredential()
if tokenErr != nil {
return tokenErr
}
client, err = azcosmos.NewClient(m.URL, token, &opts)
if err != nil {
return err
}
config = documentdb.NewConfigWithServicePrincipal(spt)
}
config.WithAppIdentifier("dapr-" + logger.DaprVersion)
c.client = documentdb.New(m.URL, config)
c.metadata = m
c.contentType = m.ContentType
// Retries initializing the client if a TooManyRequests error is encountered
err = retryOperation(func() (innerErr error) {
_, innerErr = c.findCollection()
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
// if we're authenticating using Azure AD, we can't perform CRUD operations on stored procedures, so we need to try invoking the version SP and see if we get the desired version only
if m.MasterKey == "" {
innerErr = c.checkStoredProcedures()
} else {
innerErr = c.ensureStoredProcedures()
}
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store initialization failed: %v; retrying in %s", err, d)
}, 5*time.Minute)
// Create a container client
dbClient, err := client.NewDatabase(m.Database)
if err != nil {
return err
}
// Container is synonymous with collection.
dbContainer, err := dbClient.NewContainer(m.Collection)
if err != nil {
return err
}
c.client = dbContainer
c.logger.Debug("cosmos Init done")
c.metadata = m
c.contentType = m.ContentType
return nil
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
_, err = c.client.Read(ctx, nil)
cancel()
return err
}
// Features returns the features available in this state store.
func (c *StateStore) Features() []state.Feature {
return c.features
return []state.Feature{
state.FeatureETag,
state.FeatureTransactional,
state.FeatureQueryAPI,
}
}
// Get retrieves a CosmosDB item.
func (c *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
key := req.Key
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
items := []CosmosItem{}
options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)}
options := azcosmos.ItemOptions{}
if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
}
if req.Options.Consistency == state.Eventual {
options = append(options, documentdb.ConsistencyLevel(documentdb.Eventual))
options.ConsistencyLevel = azcosmos.ConsistencyLevelSession.ToPtr()
} else if req.Options.Consistency == state.Eventual {
options.ConsistencyLevel = azcosmos.ConsistencyLevelEventual.ToPtr()
}
err := retryOperation(func() error {
_, innerErr := c.client.QueryDocuments(
c.getCollectionLink(),
documentdb.NewQuery("SELECT * FROM ROOT r WHERE r.id=@id", documentdb.P{Name: "@id", Value: key}),
&items,
options...,
)
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
readItem, err := c.client.ReadItem(ctx, azcosmos.NewPartitionKeyString(partitionKey), req.Key, &options)
cancel()
if err != nil {
var responseErr *azcore.ResponseError
if errors.As(err, &responseErr) && responseErr.ErrorCode == "NotFound" {
return &state.GetResponse{}, nil
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Get request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
return nil, err
}
item := CosmosItem{}
err = jsoniter.ConfigFastest.Unmarshal(readItem.Value, &item)
if err != nil {
return nil, err
} else if len(items) == 0 {
return &state.GetResponse{}, nil
}
if items[0].IsBinary {
if items[0].Value == nil {
item.Etag = string(readItem.Response.ETag)
if item.IsBinary {
if item.Value == nil {
return &state.GetResponse{
Data: make([]byte, 0),
ETag: ptr.String(items[0].Etag),
ETag: ptr.Of(item.Etag),
}, nil
}
bytes, decodeErr := base64.StdEncoding.DecodeString(items[0].Value.(string))
bytes, decodeErr := base64.StdEncoding.DecodeString(item.Value.(string))
if decodeErr != nil {
c.logger.Warnf("CosmosDB state store Get request could not decode binary string: %v. Returning raw string instead.", decodeErr)
bytes = []byte(items[0].Value.(string))
bytes = []byte(item.Value.(string))
}
return &state.GetResponse{
Data: bytes,
ETag: ptr.String(items[0].Etag),
ETag: ptr.Of(item.Etag),
}, nil
}
b, err := jsoniter.ConfigFastest.Marshal(&items[0].Value)
b, err := jsoniter.ConfigFastest.Marshal(&item.Value)
if err != nil {
return nil, err
}
return &state.GetResponse{
Data: b,
ETag: ptr.String(items[0].Etag),
ETag: ptr.Of(item.Etag),
}, nil
}
@ -287,47 +270,44 @@ func (c *StateStore) Set(req *state.SetRequest) error {
}
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)}
options := azcosmos.ItemOptions{}
if req.ETag != nil {
options = append(options, documentdb.IfMatch(*req.ETag))
if req.ETag != nil && *req.ETag != "" {
etag := azcore.ETag(*req.ETag)
options.IfMatchEtag = &etag
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
etag := uuid.NewString()
options = append(options, documentdb.IfMatch(etag))
var u uuid.UUID
u, err = uuid.NewRandom()
if err != nil {
return err
}
options.IfMatchEtag = ptr.Of(azcore.ETag(u.String()))
}
// Consistency levels can only be relaxed so the session level is used here
if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
}
if req.Options.Consistency == state.Eventual {
options = append(options, documentdb.ConsistencyLevel(documentdb.Eventual))
options.ConsistencyLevel = azcosmos.ConsistencyLevelSession.ToPtr()
} else if req.Options.Consistency == state.Eventual {
options.ConsistencyLevel = azcosmos.ConsistencyLevelEventual.ToPtr()
}
doc, err := createUpsertItem(c.contentType, *req, partitionKey)
if err != nil {
return err
}
err = retryOperation(func() error {
_, innerErr := c.client.UpsertDocument(c.getCollectionLink(), &doc, options...)
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Set request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
marsh, err := json.Marshal(doc)
if err != nil {
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
pk := azcosmos.NewPartitionKeyString(partitionKey)
_, err = c.client.UpsertItem(ctx, pk, marsh, &options)
cancel()
if err != nil {
return err
}
return nil
}
@ -337,36 +317,26 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
if err != nil {
return err
}
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)}
options := azcosmos.ItemOptions{}
if req.ETag != nil {
options = append(options, documentdb.IfMatch(*req.ETag))
if req.ETag != nil && *req.ETag != "" {
etag := azcore.ETag(*req.ETag)
options.IfMatchEtag = &etag
}
if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
}
if req.Options.Consistency == state.Eventual {
options = append(options, documentdb.ConsistencyLevel(documentdb.Eventual))
options.ConsistencyLevel = azcosmos.ConsistencyLevelSession.ToPtr()
} else if req.Options.Consistency == state.Eventual {
options.ConsistencyLevel = azcosmos.ConsistencyLevelEventual.ToPtr()
}
err = retryOperation(func() error {
_, innerErr := c.client.DeleteDocument(c.getDocumentLink(req.Key), options...)
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Delete request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
pk := azcosmos.NewPartitionKeyString(partitionKey)
_, err = c.client.DeleteItem(ctx, pk, req.Key, &options)
cancel()
if err != nil && !isNotFoundError(err) {
c.logger.Debugf("Error from cosmos.DeleteDocument e=%e, e.Error=%s", err, err.Error())
if req.ETag != nil {
if req.ETag != nil && *req.ETag != "" {
return state.NewETagError(state.ETagMismatch, err)
}
return err
@ -376,96 +346,111 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
}
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail.
func (c *StateStore) Multi(request *state.TransactionalStateRequest) error {
operations := []CosmosOperation{}
func (c *StateStore) Multi(request *state.TransactionalStateRequest) (err error) {
if len(request.Operations) == 0 {
c.logger.Debugf("No Operations Provided")
return nil
}
partitionKey := unknownPartitionKey
var partitionKey string
partitionKey = populatePartitionMetadata(partitionKey, request.Metadata)
batch := c.client.NewTransactionalBatch(azcosmos.NewPartitionKeyString(partitionKey))
numOperations := 0
// Loop through the list of operations. Create and add the operation to the batch
for _, o := range request.Operations {
t := o.Request.(state.KeyInt)
key := t.GetKey()
var options *azcosmos.TransactionalBatchItemOptions
partitionKey = populatePartitionMetadata(key, request.Metadata)
if o.Operation == state.Upsert {
req := o.Request.(state.SetRequest)
item, err := createUpsertItem(c.contentType, req, partitionKey)
var doc CosmosItem
doc, err = createUpsertItem(c.contentType, req, partitionKey)
if err != nil {
return err
}
upsertOperation := CosmosOperation{
Item: item,
Type: upsertOperationType,
if req.ETag != nil && *req.ETag != "" {
etag := azcore.ETag(*req.ETag)
options.IfMatchETag = &etag
}
operations = append(operations, upsertOperation)
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
var u uuid.UUID
u, err = uuid.NewRandom()
if err != nil {
return err
}
options.IfMatchETag = ptr.Of(azcore.ETag(u.String()))
}
var marsh []byte
marsh, err = json.Marshal(doc)
if err != nil {
return err
}
batch.UpsertItem(marsh, nil)
numOperations++
} else if o.Operation == state.Delete {
req := o.Request.(state.DeleteRequest)
deleteOperation := CosmosOperation{
Item: CosmosItem{
ID: req.Key,
Value: "", // Value does not need to be specified
PartitionKey: partitionKey,
},
Type: deleteOperationType,
if req.ETag != nil && *req.ETag != "" {
etag := azcore.ETag(*req.ETag)
options.IfMatchETag = &etag
}
operations = append(operations, deleteOperation)
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
var u uuid.UUID
u, err = uuid.NewRandom()
if err != nil {
return err
}
options.IfMatchETag = ptr.Of(azcore.ETag(u.String()))
}
batch.DeleteItem(req.Key, options)
numOperations++
}
}
c.logger.Debugf("#operations=%d,partitionkey=%s", len(operations), partitionKey)
c.logger.Debugf("#operations=%d,partitionkey=%s", numOperations, partitionKey)
var retString string
// The stored procedure throws if it failed, which sets err to non-nil. It doesn't return anything else.
err := retryOperation(func() error {
innerErr := c.client.ExecuteStoredProcedure(
c.getSprocLink(storedProcedureName),
[...]interface{}{operations},
&retString,
documentdb.PartitionKey(partitionKey),
)
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Multi request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
batchResponse, err := c.client.ExecuteTransactionalBatch(ctx, batch, nil)
cancel()
if err != nil {
c.logger.Debugf("error=%e", err)
return err
}
if !batchResponse.Success {
// Transaction failed, look for the offending operation
for index, operation := range batchResponse.OperationResults {
if operation.StatusCode != http.StatusFailedDependency {
c.logger.Errorf("Transaction failed due to operation %v which failed with status code %d", index, operation.StatusCode)
return fmt.Errorf("transaction failed due to operation %v which failed with status code %d", index, operation.StatusCode)
}
}
return errors.New("transaction failed")
}
// Transaction succeeded
// We can inspect the individual operation results
for index, operation := range batchResponse.OperationResults {
c.logger.Debugf("Operation %v completed with status code %d", index, operation.StatusCode)
}
return nil
}
func (c *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error) {
q := &Query{}
qbuilder := query.NewQueryBuilder(q)
if err := qbuilder.BuildQuery(&req.Query); err != nil {
return &state.QueryResponse{}, err
}
var data []state.QueryItem
var token string
err := retryOperation(func() error {
var innerErr error
data, token, innerErr = q.execute(c.client, c.getCollectionLink())
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Ping request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
data, token, err := q.execute(c.client)
if err != nil {
return &state.QueryResponse{}, err
return nil, err
}
return &state.QueryResponse{
@ -475,125 +460,17 @@ func (c *StateStore) Query(req *state.QueryRequest) (*state.QueryResponse, error
}
func (c *StateStore) Ping() error {
return retryOperation(func() error {
_, innerErr := c.findCollection()
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Ping request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
}
// getCollectionLink returns the link to the collection.
func (c *StateStore) getCollectionLink() string {
return fmt.Sprintf("dbs/%s/colls/%s/", c.metadata.Database, c.metadata.Collection)
}
// getDocumentLink returns the link to a document in the collection.
func (c *StateStore) getDocumentLink(docID string) string {
return fmt.Sprintf("dbs/%s/colls/%s/docs/%s", c.metadata.Database, c.metadata.Collection, docID)
}
// getSprocLink returns the link to a stored procedure in the collection.
func (c *StateStore) getSprocLink(sprocName string) string {
return fmt.Sprintf("dbs/%s/colls/%s/sprocs/%s", c.metadata.Database, c.metadata.Collection, sprocName)
}
func (c *StateStore) checkStoredProcedures() error {
var ver int
// not wrapping this in a retryable block because this method is already used as part of one
err := c.client.ExecuteStoredProcedure(c.getSprocLink(versionSpName), nil, &ver, documentdb.PartitionKey("1"))
if err == nil {
c.logger.Debugf("Cosmos DB stored procedure version: %d", ver)
}
if err != nil || (err == nil && ver != spVersion) {
// Note that when the `stylecheck` linter is working with Go 1.18 again, this will need "nolint:stylecheck"
return fmt.Errorf("Dapr requires stored procedures created in Cosmos DB before it can be used as state store. Those stored procedures are currently not existing or are using a different version than expected. When you authenticate using Azure AD we cannot automatically create them for you: please start this state store with a Cosmos DB master key just once so we can create the stored procedures for you; otherwise, you can check our docs to learn how to create them yourself: https://aka.ms/dapr/cosmosdb-aad") //nolint:stylecheck
}
return nil
}
func (c *StateStore) ensureStoredProcedures() error {
spLink := c.getSprocLink(storedProcedureName)
verSpLink := c.getSprocLink(versionSpName)
// get a link to the sp's
// not wrapping this in a retryable block because this method is already used as part of one
sp, err := c.client.ReadStoredProcedure(spLink)
if err != nil && !isNotFoundError(err) {
return err
}
verSp, err := c.client.ReadStoredProcedure(verSpLink)
if err != nil && !isNotFoundError(err) {
return err
}
// check version
replace := false
if verSp != nil {
var ver int
err = c.client.ExecuteStoredProcedure(verSpLink, nil, &ver, documentdb.PartitionKey("1"))
if err == nil {
c.logger.Debugf("Cosmos DB stored procedure version: %d", ver)
}
if err != nil || (err == nil && ver != spVersion) {
// ignore errors: just replace the stored procedures
replace = true
}
}
if verSp == nil || replace {
// register/replace the stored procedure
createspBody := storedProcedureDefinition{ID: versionSpName, Body: spVersionDefinition}
if replace && verSp != nil {
c.logger.Debugf("Replacing Cosmos DB stored procedure %s", versionSpName)
_, err = c.client.ReplaceStoredProcedure(verSp.Self, createspBody)
} else {
c.logger.Debugf("Creating Cosmos DB stored procedure %s", versionSpName)
_, err = c.client.CreateStoredProcedure(c.getCollectionLink(), createspBody)
}
// if it already exists that is success (Conflict should only happen on Create commands)
if err != nil && !strings.HasPrefix(err.Error(), "Conflict") {
return err
}
}
if sp == nil || replace {
// register the stored procedure
createspBody := storedProcedureDefinition{ID: storedProcedureName, Body: spDefinition}
if replace && sp != nil {
c.logger.Debugf("Replacing Cosmos DB stored procedure %s", storedProcedureName)
_, err = c.client.ReplaceStoredProcedure(sp.Self, createspBody)
} else {
c.logger.Debugf("Creating Cosmos DB stored procedure %s", storedProcedureName)
_, err = c.client.CreateStoredProcedure(c.getCollectionLink(), createspBody)
}
// if it already exists that is success (Conflict should only happen on Create commands)
if err != nil && !strings.HasPrefix(err.Error(), "Conflict") {
return err
}
}
return nil
}
func (c *StateStore) findCollection() (*documentdb.Collection, error) {
coll, err := c.client.ReadCollection(c.getCollectionLink())
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
_, err := c.client.Read(ctx, nil)
cancel()
if err != nil {
return nil, err
return err
}
if coll == nil || coll.Self == "" {
return nil, fmt.Errorf("collection %s in database %s for CosmosDB state store not found. This must be created before Dapr uses it", c.metadata.Collection, c.metadata.Database)
}
return coll, nil
return nil
}
func createUpsertItem(contentType string, req state.SetRequest, partitionKey string) (CosmosItem, error) {
byteArray, isBinary := req.Value.([]uint8)
byteArray, isBinary := req.Value.([]byte)
if len(byteArray) == 0 {
isBinary = false
}
@ -610,32 +487,35 @@ func createUpsertItem(contentType string, req state.SetRequest, partitionKey str
// if byte array is not a valid JSON, so keep it as-is to be Base64 encoded in CosmosDB.
// otherwise, we save it as JSON
if err == nil {
return CosmosItem{
item := CosmosItem{
ID: req.Key,
Value: value,
PartitionKey: partitionKey,
IsBinary: false,
TTL: ttl,
}, nil
}
return item, nil
}
} else if contenttype.IsStringContentType(contentType) {
return CosmosItem{
item := CosmosItem{
ID: req.Key,
Value: string(byteArray),
PartitionKey: partitionKey,
IsBinary: false,
TTL: ttl,
}, nil
}
return item, nil
}
}
return CosmosItem{
item := CosmosItem{
ID: req.Key,
Value: req.Value,
PartitionKey: partitionKey,
IsBinary: isBinary,
TTL: ttl,
}, nil
}
return item, nil
}
// This is a helper to return the partition key to use. If if metadata["partitionkey"] is present,
@ -662,34 +542,13 @@ func parseTTL(requestMetadata map[string]string) (*int, error) {
return nil, nil
}
func retryOperation(operation backoff.Operation, notify backoff.Notify, maxElapsedTime time.Duration) error {
bo := backoff.NewExponentialBackOff()
bo.InitialInterval = 2 * time.Second
bo.MaxElapsedTime = maxElapsedTime
return backoff.RetryNotify(operation, bo, notify)
}
func isTooManyRequestsError(err error) bool {
if err == nil {
return false
}
if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusTooManyRequests {
return true
}
}
return false
}
func isNotFoundError(err error) bool {
if err == nil {
return false
}
if requestError, ok := err.(*documentdb.RequestError); ok {
if requestError.Code == statusNotFound {
if requestError, ok := err.(*azcore.ResponseError); ok {
if requestError.ErrorCode == statusNotFound {
return true
}
}

View File

@ -14,20 +14,27 @@ limitations under the License.
package cosmosdb
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"strings"
"github.com/a8m/documentdb"
"github.com/agrea/ptr"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
jsoniter "github.com/json-iterator/go"
"github.com/dapr/components-contrib/state"
"github.com/dapr/components-contrib/state/query"
)
// Internal query object is created here since azcosmos has no notion of a query object
type InternalQuery struct {
query string
parameters []azcosmos.QueryParameter
}
type Query struct {
query documentdb.Query
query InternalQuery
limit int
token string
}
@ -40,7 +47,7 @@ func (q *Query) VisitEQ(f *query.EQ) (string, error) {
}
name := q.setNextParameter(val)
return fmt.Sprintf("%s = %s", replaceKeywords("c.value."+f.Key), name), nil
return replaceKeywords("c.value."+f.Key) + " = " + name, nil
}
func (q *Query) VisitIN(f *query.IN) (string, error) {
@ -109,20 +116,21 @@ func (q *Query) VisitOR(f *query.OR) (string, error) {
func (q *Query) Finalize(filters string, qq *query.Query) error {
var filter, orderBy string
if len(filters) != 0 {
filter = fmt.Sprintf(" WHERE %s", filters)
filter = " WHERE " + filters
}
if sz := len(qq.Sort); sz != 0 {
order := make([]string, sz)
for i, item := range qq.Sort {
if item.Order == query.DESC {
order[i] = fmt.Sprintf("%s DESC", replaceKeywords("c.value."+item.Key))
order[i] = replaceKeywords("c.value."+item.Key) + " DESC"
} else {
order[i] = fmt.Sprintf("%s ASC", replaceKeywords("c.value."+item.Key))
order[i] = replaceKeywords("c.value."+item.Key) + " ASC"
}
}
orderBy = fmt.Sprintf(" ORDER BY %s", strings.Join(order, ", "))
orderBy = " ORDER BY " + strings.Join(order, ", ")
}
q.query.Query = fmt.Sprintf("SELECT * FROM c%s%s", filter, orderBy)
q.query.query = "SELECT * FROM c" + filter + orderBy
q.limit = qq.Page.Limit
q.token = qq.Page.Token
@ -130,37 +138,58 @@ func (q *Query) Finalize(filters string, qq *query.Query) error {
}
func (q *Query) setNextParameter(val string) string {
pname := fmt.Sprintf("@__param__%d__", len(q.query.Parameters))
q.query.Parameters = append(q.query.Parameters, documentdb.Parameter{Name: pname, Value: val})
pname := fmt.Sprintf("@__param__%d__", len(q.query.parameters))
q.query.parameters = append(q.query.parameters, azcosmos.QueryParameter{Name: pname, Value: val})
return pname
}
func (q *Query) execute(client *documentdb.DocumentDB, collection string) ([]state.QueryItem, string, error) {
opts := []documentdb.CallOption{documentdb.CrossPartition()}
func (q *Query) execute(client *azcosmos.ContainerClient) ([]state.QueryItem, string, error) {
opts := &azcosmos.QueryOptions{}
opts.QueryParameters = append(opts.QueryParameters, q.query.parameters...)
if q.limit != 0 {
opts = append(opts, documentdb.Limit(q.limit))
opts.PageSizeHint = int32(q.limit)
}
if len(q.token) != 0 {
opts = append(opts, documentdb.Continuation(q.token))
opts.ContinuationToken = q.token
}
items := []CosmosItem{}
resp, err := client.QueryDocuments(collection, &q.query, &items, opts...)
if err != nil {
return nil, "", err
pk := azcosmos.NewPartitionKeyBool(true)
queryPager := client.NewQueryItemsPager(q.query.query, pk, opts)
token := ""
for queryPager.More() {
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
queryResponse, innerErr := queryPager.NextPage(ctx)
cancel()
if innerErr != nil {
return nil, "", innerErr
}
token = queryResponse.ContinuationToken
for _, item := range queryResponse.Items {
tempItem := CosmosItem{}
err := json.Unmarshal(item, &tempItem)
if err != nil {
return nil, "", err
}
items = append(items, tempItem)
}
}
token := resp.Header.Get(documentdb.HeaderContinuation)
ret := make([]state.QueryItem, len(items))
var err error
for i := range items {
ret[i].Key = items[i].ID
ret[i].ETag = ptr.String(items[i].Etag)
ret[i].ETag = &items[i].Etag
if items[i].IsBinary {
ret[i].Data, _ = base64.StdEncoding.DecodeString(items[i].Value.(string))
continue
}
ret[i].Data, err = jsoniter.ConfigFastest.Marshal(&items[i].Value)
if err != nil {
ret[i].Error = err.Error()
@ -182,16 +211,28 @@ func replaceKeywords(key string) string {
return key
}
// Replaces reserved keywords. If a replacement of a reserved keyword is made, all other words will be changed from .word to ['word']
func replaceKeyword(key, keyword string) string {
indx := strings.Index(strings.ToUpper(key), "."+strings.ToUpper(keyword))
if indx == -1 {
return key
}
// Grab the next index to check and ensure that it doesn't over-index
nextIndx := indx + len(keyword) + 1
if nextIndx == len(key) || !isLetter(key[nextIndx]) {
return fmt.Sprintf("%s['%s']%s", key[:indx], key[indx+1:nextIndx], replaceKeyword(key[nextIndx:], keyword))
// Get the new keyword to replace
newKeyword := keyword
if nextIndx < len(key)-1 {
// Get the index of the next period (Note that it grabs the index relative to the beginning of the initial string)
idxOfPeriod := strings.Index(key[nextIndx+1:], ".")
if idxOfPeriod != -1 {
newKeyword = key[nextIndx+1 : nextIndx+idxOfPeriod+1]
} else {
newKeyword = key[nextIndx+1:]
}
}
return fmt.Sprintf("%s['%s']%s", key[:indx], key[indx+1:nextIndx], replaceKeyword(key[nextIndx:], newKeyword))
}
return key
}

View File

@ -18,7 +18,7 @@ import (
"os"
"testing"
"github.com/a8m/documentdb"
"github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos"
"github.com/stretchr/testify/assert"
"github.com/dapr/components-contrib/state/query"
@ -40,7 +40,7 @@ func TestCosmosDbKeyReplace(t *testing.T) {
},
{
input: "c.value.a",
expected: "c['value'].a",
expected: "c['value']['a']",
},
{
input: "c.value.value",
@ -48,7 +48,7 @@ func TestCosmosDbKeyReplace(t *testing.T) {
},
{
input: "c.value.a.value",
expected: "c['value'].a['value']",
expected: "c['value']['a']['value']",
},
}
@ -60,20 +60,20 @@ func TestCosmosDbKeyReplace(t *testing.T) {
func TestCosmosDbQuery(t *testing.T) {
tests := []struct {
input string
query documentdb.Query
query InternalQuery
}{
{
input: "../../../tests/state/query/q1.json",
query: documentdb.Query{
Query: "SELECT * FROM c",
Parameters: nil,
query: InternalQuery{
query: "SELECT * FROM c",
parameters: nil,
},
},
{
input: "../../../tests/state/query/q2.json",
query: documentdb.Query{
Query: "SELECT * FROM c WHERE c['value'].state = @__param__0__",
Parameters: []documentdb.Parameter{
query: InternalQuery{
query: "SELECT * FROM c WHERE c['value']['state'] = @__param__0__",
parameters: []azcosmos.QueryParameter{
{
Name: "@__param__0__",
Value: "CA",
@ -83,9 +83,9 @@ func TestCosmosDbQuery(t *testing.T) {
},
{
input: "../../../tests/state/query/q3.json",
query: documentdb.Query{
Query: "SELECT * FROM c WHERE c['value'].person.org = @__param__0__ AND c['value'].state IN (@__param__1__, @__param__2__) ORDER BY c['value'].state DESC, c['value'].person.name ASC",
Parameters: []documentdb.Parameter{
query: InternalQuery{
query: "SELECT * FROM c WHERE c['value']['person']['org'] = @__param__0__ AND c['value']['state'] IN (@__param__1__, @__param__2__) ORDER BY c['value']['state'] DESC, c['value']['person']['name'] ASC",
parameters: []azcosmos.QueryParameter{
{
Name: "@__param__0__",
Value: "A",
@ -103,9 +103,9 @@ func TestCosmosDbQuery(t *testing.T) {
},
{
input: "../../../tests/state/query/q4.json",
query: documentdb.Query{
Query: "SELECT * FROM c WHERE c['value'].person.org = @__param__0__ OR (c['value'].person.org = @__param__1__ AND c['value'].state IN (@__param__2__, @__param__3__)) ORDER BY c['value'].state DESC, c['value'].person.name ASC",
Parameters: []documentdb.Parameter{
query: InternalQuery{
query: "SELECT * FROM c WHERE c['value']['person']['org'] = @__param__0__ OR (c['value']['person']['org'] = @__param__1__ AND c['value']['state'] IN (@__param__2__, @__param__3__)) ORDER BY c['value']['state'] DESC, c['value']['person']['name'] ASC",
parameters: []azcosmos.QueryParameter{
{
Name: "@__param__0__",
Value: "A",

View File

@ -1,116 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// operations - an array of objects to upsert or delete
function dapr_multi_v2(operations) {
if (typeof operations === "string") {
throw new Error("arg is a string, expected array of objects");
}
var context = getContext();
var collection = context.getCollection();
var collectionLink = collection.getSelfLink();
var response = context.getResponse();
// Upserts do not reflect until the transaction is committed,
// as a result of which SELECT will not return the new values.
// We need to store document URLs (_self) in order to do deletions.
var documentMap = {}
var operationCount = 0;
if (operations.length > 0) {
tryExecute(operations[operationCount], callback);
}
function tryExecute(operation, callback) {
switch (operation["type"]) {
case "upsert":
tryCreate(operation["item"], callback);
break;
case "delete":
tryQueryAndDelete(operation["item"], callback);
break;
default:
throw new Error("operation type not supported - should be 'upsert' or 'delete'");
}
}
function tryCreate(doc, callback) {
var isAccepted = collection.upsertDocument(collectionLink, doc, callback);
// Fail if we hit execution bounds.
if (!isAccepted) {
throw new Error("upsertDocument() not accepted, please retry");
}
}
function tryQueryAndDelete(doc, callback) {
// Check the cache first. We expect to find the document if it was upserted.
var documentLink = documentMap[doc["id"]];
if (documentLink) {
tryDelete(documentLink, callback);
return;
}
// Not found in cache, query for it.
var query = "select n._self from n where n.id='" + doc["id"] + "'";
console.log("query: " + query)
var requestOptions = {};
var isAccepted = collection.queryDocuments(collectionLink, query, requestOptions, function (err, retrievedDocs, _responseOptions) {
if (err) throw err;
if (retrievedDocs == null || retrievedDocs.length == 0) {
// Nothing to delete.
response.setBody(JSON.stringify("success"));
} else {
tryDelete(retrievedDocs[0]._self, callback);
}
});
// fail if we hit execution bounds
if (!isAccepted) {
throw new Error("queryDocuments() not accepted, please retry");
}
}
function tryDelete(documentLink, callback) {
// Delete the first document in the array.
var requestOptions = {};
var isAccepted = collection.deleteDocument(documentLink, requestOptions, (err, _responseOptions) => {
callback(err, null, _responseOptions);
});
// Fail if we hit execution bounds.
if (!isAccepted) {
throw new Error("deleteDocument() not accepted, please retry");
}
}
function callback(err, doc, _options) {
if (err) throw err;
// Document references are stored for all upserts.
// This can be used for further deletes in this transaction.
if (doc && doc._self) documentMap[doc.id] = doc._self;
operationCount++;
if (operationCount >= operations.length) {
// Operations are done.
response.setBody(JSON.stringify("success"));
} else {
tryExecute(operations[operationCount], callback);
}
}
}

View File

@ -1,17 +0,0 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
function daprSpVersion(prefix) {
var response = getContext().getResponse();
response.setBody(2);
}

View File

@ -21,7 +21,7 @@ require (
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect

View File

@ -55,8 +55,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1 h1:Sd7LtAlpRJ50lAj49S+pT6K0OUt+4KsNzB2uUArrWKg=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.1/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=

View File

@ -16,8 +16,10 @@ require (
github.com/AdhityaRamadhanus/fasthttpcors v0.0.0-20170121111917-d4c07198763a // indirect
github.com/Azure/azure-amqp-common-go/v3 v3.2.3 // indirect
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 // indirect
github.com/Azure/azure-storage-blob-go v0.10.0 // indirect
github.com/Azure/azure-storage-queue-go v0.0.0-20191125232315-636801874cdd // indirect
@ -32,8 +34,6 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
github.com/a8m/documentdb v1.3.1-0.20220405205223-5b41ba0aaeb1 // indirect
github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b // indirect
github.com/andybalholm/brotli v1.0.4 // indirect
github.com/antlr/antlr4 v0.0.0-20200503195918-621b933c7a7f // indirect
github.com/armon/go-metrics v0.3.10 // indirect

View File

@ -49,10 +49,14 @@ github.com/Azure/azure-pipeline-go v0.1.8/go.mod h1:XA1kFWRVhSK+KNFiOhfv83Fv8L9a
github.com/Azure/azure-pipeline-go v0.2.2/go.mod h1:4rQ/NZncSvGqNkkOsNpOU1tgoNuIlp9AfUH5G1tvCHc=
github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible h1:HzKLt3kIwMm4KeJYTdx9EbjRYTySD/t8i1Ee/W5EGXw=
github.com/Azure/azure-sdk-for-go v65.0.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0 h1:Ut0ZGdOwJDw0npYEg+TLlPls3Pq6JiZaP2/aGKir7Zw=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.1.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2 h1:yJegJqjhrMJ3Oe5s43jOTGL2AsE7pJyx+7Yqls/65tw=
github.com/Azure/azure-sdk-for-go/sdk/data/azcosmos v0.3.2/go.mod h1:Fy3bbChFm4cZn6oIxYYqKB2FG3rBDxk3NZDLDJCHl+Q=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0 h1:jp0dGvZ7ZK0mgqnTSClMxa5xuRL7NZgHameVYF6BurY=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.0.0/go.mod h1:eWRD7oawr1Mu1sLCawqVc0CUiF43ia3qQMxLscsKQ9w=
github.com/Azure/azure-storage-blob-go v0.10.0 h1:evCwGreYo3XLeBV4vSxLbLiYb6e0SzsJiXQVRGsRXxs=
@ -108,10 +112,7 @@ github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 h1:d+Bc7a5rLufV
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/Shopify/sarama v1.30.0/go.mod h1:zujlQQx1kzHsh4jfV1USnptCQrHAEZ2Hk8fTKCulPVs=
github.com/Shopify/toxiproxy/v2 v2.1.6-0.20210914104332-15ea381dcdae/go.mod h1:/cvHQkZ1fst0EmZnA5dFtiQdWCNCFYzb+uE2vqVgvx0=
github.com/a8m/documentdb v1.3.1-0.20220405205223-5b41ba0aaeb1 h1:vdxL7id6rXNHNAh7yHUHiTsTvFupt+c7MBa+1bru+48=
github.com/a8m/documentdb v1.3.1-0.20220405205223-5b41ba0aaeb1/go.mod h1:4Z0mpi7fkyqjxUdGiNMO3vagyiUoiwLncaIX6AsW5z0=
github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b h1:WMhlIaJkDgEQSVJQM06YV+cYUl1r5OY5//ijMXJNqtA=
github.com/agrea/ptr v0.0.0-20180711073057-77a518d99b7b/go.mod h1:Tie46d3UWzXpj+Fh9+DQTyaUxEpFBPOLXrnx7nxlKRo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
@ -453,7 +454,6 @@ github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22
github.com/jonboulle/clockwork v0.2.2/go.mod h1:Pkfl5aHPm1nk2H9h0bjmnJD/BcgbGXUBGnn1kMkgxc8=
github.com/josharian/intern v1.0.0/go.mod h1:5DoeVV0s6jJacbCEi61lwdGj/aVlrQvzHFFd8Hwg//Y=
github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4=
github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.9/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=