Cosmos DB: Delete does not need to query for the doc first

This, which is possible now because of improvements in the upstream SDK, should speed up delete operations significantly (and reducing RU usage).
Also, this should fix potential concurrency issues.

Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
Alessandro (Ale) Segala 2022-04-05 21:20:57 +00:00 committed by GitHub
parent 6f4ba10d13
commit 612f6e6ec3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 13 additions and 31 deletions

View File

@ -268,11 +268,11 @@ func (c *StateStore) Set(req *state.SetRequest) error {
options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)}
if req.ETag != nil {
options = append(options, documentdb.IfMatch((*req.ETag)))
options = append(options, documentdb.IfMatch(*req.ETag))
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
etag := uuid.NewString()
options = append(options, documentdb.IfMatch((etag)))
options = append(options, documentdb.IfMatch(etag))
}
if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
@ -319,32 +319,8 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
partitionKey := populatePartitionMetadata(req.Key, req.Metadata)
options := []documentdb.CallOption{documentdb.PartitionKey(partitionKey)}
items := []CosmosItem{}
err = retryOperation(func() error {
_, innerErr := c.client.QueryDocuments(
c.getCollectionLink(),
documentdb.NewQuery("SELECT * FROM ROOT r WHERE r.id=@id", documentdb.P{Name: "@id", Value: req.Key}),
&items,
options...,
)
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
}
return backoff.Permanent(innerErr)
}
return nil
}, func(err error, d time.Duration) {
c.logger.Warnf("CosmosDB state store Delete Query request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
if err != nil {
return err
} else if len(items) == 0 {
return nil
}
if req.ETag != nil {
options = append(options, documentdb.IfMatch((*req.ETag)))
options = append(options, documentdb.IfMatch(*req.ETag))
}
if req.Options.Consistency == state.Strong {
options = append(options, documentdb.ConsistencyLevel(documentdb.Strong))
@ -354,7 +330,7 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
}
err = retryOperation(func() error {
_, innerErr := c.client.DeleteDocument(items[0].Self, options...)
_, innerErr := c.client.DeleteDocument(c.getDocumentLink(req.Key), options...)
if innerErr != nil {
if isTooManyRequestsError(innerErr) {
return innerErr
@ -366,14 +342,15 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
c.logger.Warnf("CosmosDB state store Delete request failed: %v; retrying in %s", err, d)
}, 20*time.Second)
if err != nil {
if err != nil && !isNotFoundError(err) {
c.logger.Debugf("Error from cosmos.DeleteDocument e=%e, e.Error=%s", err, err.Error())
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}
return err
}
return err
return nil
}
// Multi performs a transactional operation. succeeds only if all operations succeed, and fails if one or more operations fail.
@ -495,7 +472,12 @@ func (c *StateStore) getCollectionLink() string {
return fmt.Sprintf("dbs/%s/colls/%s/", c.metadata.Database, c.metadata.Collection)
}
// getSprocLink returns the link to the stored procedures.
// getDocumentLink returns the link to a document in the collection.
func (c *StateStore) getDocumentLink(docId string) string {
return fmt.Sprintf("dbs/%s/colls/%s/docs/%s", c.metadata.Database, c.metadata.Collection, docId)
}
// getSprocLink returns the link to a stored procedure in the collection.
func (c *StateStore) getSprocLink(sprocName string) string {
return fmt.Sprintf("dbs/%s/colls/%s/sprocs/%s", c.metadata.Database, c.metadata.Collection, sprocName)
}