Fixing conformance test for cosmosDB for track2sdk
Signed-off-by: Ryan Lettieri <ryanLettieri@microsoft.com>
This commit is contained in:
parent
5d112f4d33
commit
705070036d
|
@ -76,10 +76,10 @@ type CosmosItem struct {
|
|||
|
||||
const (
|
||||
metadataPartitionKey = "partitionKey"
|
||||
unknownPartitionKey = "__UNKNOWN__"
|
||||
metadataTTLKey = "ttlInSeconds"
|
||||
statusTooManyRequests = "429" // RFC 6585, 4
|
||||
defaultTimeout = 20 * time.Second
|
||||
statusNotFound = "NotFound"
|
||||
)
|
||||
|
||||
// policy that tracks the number of times it was invoked
|
||||
|
@ -206,7 +206,7 @@ func (c *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
|||
|
||||
options := azcosmos.ItemOptions{}
|
||||
if req.Options.Consistency == state.Strong {
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelStrong.ToPtr()
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelSession.ToPtr()
|
||||
}
|
||||
if req.Options.Consistency == state.Eventual {
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelEventual.ToPtr()
|
||||
|
@ -230,6 +230,8 @@ func (c *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
|
|||
return nil, err
|
||||
}
|
||||
|
||||
item.Etag = string(readItem.Response.ETag)
|
||||
|
||||
if item.IsBinary {
|
||||
if item.Value == nil {
|
||||
return &state.GetResponse{
|
||||
|
@ -279,8 +281,9 @@ func (c *StateStore) Set(req *state.SetRequest) error {
|
|||
newTag := azcore.ETag(uuid.NewString())
|
||||
options.IfMatchEtag = &newTag
|
||||
}
|
||||
// Consistency levels can only be relaxed so the session level is used here
|
||||
if req.Options.Consistency == state.Strong {
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelStrong.ToPtr()
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelSession.ToPtr()
|
||||
}
|
||||
if req.Options.Consistency == state.Eventual {
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelEventual.ToPtr()
|
||||
|
@ -320,7 +323,7 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
|
|||
options.IfMatchEtag = &etag
|
||||
}
|
||||
if req.Options.Consistency == state.Strong {
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelStrong.ToPtr()
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelSession.ToPtr()
|
||||
}
|
||||
if req.Options.Consistency == state.Eventual {
|
||||
options.ConsistencyLevel = azcosmos.ConsistencyLevelEventual.ToPtr()
|
||||
|
@ -330,7 +333,11 @@ func (c *StateStore) Delete(req *state.DeleteRequest) error {
|
|||
pk := azcosmos.NewPartitionKeyString(partitionKey)
|
||||
_, err = c.client.DeleteItem(ctx, pk, req.Key, &options)
|
||||
cancel()
|
||||
if err != nil {
|
||||
if err != nil && !isNotFoundError(err) {
|
||||
c.logger.Debugf("Error from cosmos.DeleteDocument e=%e, e.Error=%s", err, err.Error())
|
||||
if req.ETag != nil {
|
||||
return state.NewETagError(state.ETagMismatch, err)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -343,16 +350,7 @@ func (c *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
c.logger.Debugf("No Operations Provided")
|
||||
return nil
|
||||
}
|
||||
partitionKey := unknownPartitionKey
|
||||
|
||||
switch request.Operations[0].Operation {
|
||||
case state.Upsert:
|
||||
stateItem := request.Operations[0].Request.(*state.SetRequest)
|
||||
partitionKey = populatePartitionMetadata(stateItem.Key, stateItem.Metadata)
|
||||
case state.Delete:
|
||||
stateItem := request.Operations[0].Request.(*state.DeleteRequest)
|
||||
partitionKey = populatePartitionMetadata(stateItem.Key, stateItem.Metadata)
|
||||
}
|
||||
partitionKey := request.Metadata["partitionKey"]
|
||||
|
||||
batch := c.client.NewTransactionalBatch(azcosmos.NewPartitionKeyString(partitionKey))
|
||||
|
||||
|
@ -402,8 +400,6 @@ func (c *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
|
||||
c.logger.Debugf("#operations=%d,partitionkey=%s", numOperations, partitionKey)
|
||||
|
||||
var itemResponseBody map[string]string
|
||||
|
||||
batchResponse, err := c.client.ExecuteTransactionalBatch(context.Background(), batch, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -413,10 +409,6 @@ func (c *StateStore) Multi(request *state.TransactionalStateRequest) error {
|
|||
// We can inspect the individual operation results
|
||||
for index, operation := range batchResponse.OperationResults {
|
||||
c.logger.Debugf("Operation %v completed with status code %v", index, operation.StatusCode)
|
||||
err = json.Unmarshal(operation.ResourceBody, &itemResponseBody)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Transaction failed, look for the offending operation
|
||||
|
@ -534,3 +526,17 @@ func parseTTL(requestMetadata map[string]string) (*int, error) {
|
|||
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func isNotFoundError(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
|
||||
if requestError, ok := err.(*azcore.ResponseError); ok {
|
||||
if requestError.ErrorCode == statusNotFound {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue