state.azure.blobstorage now passes conformance tests

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
ItalyPaleAle 2022-07-01 20:39:34 +00:00 committed by Bernd Verst
parent 05a8dbb75b
commit 6f00a83546
1 changed files with 45 additions and 50 deletions

View File

@ -36,10 +36,10 @@ Concurrency is supported with ETags according to https://docs.microsoft.com/en-u
package blobstorage
import (
"bytes"
"context"
b64 "encoding/base64"
"fmt"
"io"
"net"
"net/url"
"strings"
@ -132,36 +132,19 @@ func (r *StateStore) Features() []state.Feature {
// Delete the state.
func (r *StateStore) Delete(req *state.DeleteRequest) error {
r.logger.Debugf("delete %s", req.Key)
return r.deleteFile(req)
return r.deleteFile(context.Background(), req)
}
// Get the state.
func (r *StateStore) Get(req *state.GetRequest) (*state.GetResponse, error) {
r.logger.Debugf("fetching %s", req.Key)
data, etag, contentType, err := r.readFile(req)
if err != nil {
r.logger.Debugf("error %s", err)
if isNotFoundError(err) {
return &state.GetResponse{}, nil
}
return &state.GetResponse{}, err
}
return &state.GetResponse{
Data: data,
ETag: ptr.String(etag),
ContentType: contentType,
}, err
r.logger.Debugf("get %s", req.Key)
return r.readFile(context.Background(), req)
}
// Set the state.
func (r *StateStore) Set(req *state.SetRequest) error {
r.logger.Debugf("saving %s", req.Key)
return r.writeFile(req)
return r.writeFile(context.Background(), req)
}
func (r *StateStore) Ping() error {
@ -204,39 +187,44 @@ func getBlobStorageMetadata(metadata map[string]string) (*blobStorageMetadata, e
return &meta, nil
}
func (r *StateStore) readFile(req *state.GetRequest) ([]byte, string, *string, error) {
func (r *StateStore) readFile(ctx context.Context, req *state.GetRequest) (*state.GetResponse, error) {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
resp, err := blobURL.Download(context.Background(), 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
resp, err := blobURL.Download(ctx, 0, azblob.CountToEnd, azblob.BlobAccessConditions{}, false)
if err != nil {
r.logger.Debugf("download file %s, err %s", req.Key, err)
return nil, "", nil, err
if isNotFoundError(err) {
return &state.GetResponse{}, nil
}
return &state.GetResponse{}, err
}
bodyStream := resp.Body(azblob.RetryReaderOptions{})
data := bytes.Buffer{}
_, err = data.ReadFrom(bodyStream)
data, err := io.ReadAll(bodyStream)
if err != nil {
r.logger.Debugf("read file %s, err %s", req.Key, err)
return nil, "", nil, err
return &state.GetResponse{}, err
}
contentType := resp.ContentType()
return data.Bytes(), string(resp.ETag()), &contentType, nil
return &state.GetResponse{
Data: data,
ETag: ptr.String(string(resp.ETag())),
ContentType: &contentType,
}, nil
}
func (r *StateStore) writeFile(req *state.SetRequest) error {
func (r *StateStore) writeFile(ctx context.Context, req *state.SetRequest) error {
accessConditions := azblob.BlobAccessConditions{}
if req.Options.Concurrency == state.FirstWrite && req.ETag != nil {
var etag string
if req.ETag != nil {
etag = *req.ETag
}
accessConditions.IfMatch = azblob.ETag(etag)
if req.ETag != nil && *req.ETag != "" {
accessConditions.IfMatch = azblob.ETag(*req.ETag)
}
if req.Options.Concurrency == state.FirstWrite && (req.ETag == nil || *req.ETag == "") {
accessConditions.IfNoneMatch = azblob.ETag("*")
}
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
@ -245,8 +233,7 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
if err != nil {
return err
}
_, err = azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
Parallelism: 16,
_, err = azblob.UploadBufferToBlockBlob(ctx, r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
Metadata: req.Metadata,
AccessConditions: accessConditions,
BlobHTTPHeaders: blobHTTPHeaders,
@ -254,8 +241,11 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
// Check if the error is due to ETag conflict
if isETagConflictError(err) {
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
}
}
return err
@ -305,24 +295,23 @@ func (r *StateStore) createBlobHTTPHeadersFromRequest(req *state.SetRequest) (az
return blobHTTPHeaders, nil
}
func (r *StateStore) deleteFile(req *state.DeleteRequest) error {
func (r *StateStore) deleteFile(ctx context.Context, req *state.DeleteRequest) error {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
accessConditions := azblob.BlobAccessConditions{}
if req.Options.Concurrency == state.FirstWrite && req.ETag != nil {
var etag string
if req.ETag != nil {
etag = *req.ETag
}
accessConditions.IfMatch = azblob.ETag(etag)
if req.ETag != nil && *req.ETag != "" {
accessConditions.IfMatch = azblob.ETag(*req.ETag)
}
_, err := blobURL.Delete(context.Background(), azblob.DeleteSnapshotsOptionNone, accessConditions)
_, err := blobURL.Delete(ctx, azblob.DeleteSnapshotsOptionNone, accessConditions)
if err != nil {
r.logger.Debugf("delete file %s, err %s", req.Key, err)
if req.ETag != nil {
return state.NewETagError(state.ETagMismatch, err)
} else if isNotFoundError(err) {
// deleting an item that doesn't exist without specifying an ETAG is a noop
return nil
}
return err
@ -357,3 +346,9 @@ func isNotFoundError(err error) bool {
return ok && azureError.ServiceCode() == azblob.ServiceCodeBlobNotFound
}
func isETagConflictError(err error) bool {
azureError, ok := err.(azblob.StorageError)
return ok && azureError.ServiceCode() == azblob.ServiceCodeConditionNotMet
}