Update state/blobstorage.go to add metadata. (#786)

Co-authored-by: Phil Kedy <phil.kedy@gmail.com>
This commit is contained in:
Daniel Beall 2021-04-15 21:40:09 -07:00 committed by GitHub
parent 58b43fbfdb
commit a35a715f9c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 6 deletions

View File

@ -30,6 +30,7 @@ package blobstorage
import (
"bytes"
"context"
b64 "encoding/base64"
"fmt"
"net/url"
"strings"
@ -43,10 +44,16 @@ import (
)
const (
keyDelimiter = "||"
accountNameKey = "accountName"
accountKeyKey = "accountKey"
containerNameKey = "containerName"
keyDelimiter = "||"
accountNameKey = "accountName"
accountKeyKey = "accountKey"
containerNameKey = "containerName"
contentType = "ContentType"
contentMD5 = "ContentMD5"
contentEncoding = "ContentEncoding"
contentLanguage = "ContentLanguage"
contentDisposition = "ContentDisposition"
cacheControl = "CacheControl"
)
// StateStore Type
@ -191,8 +198,6 @@ func (r *StateStore) readFile(req *state.GetRequest) ([]byte, string, error) {
}
func (r *StateStore) writeFile(req *state.SetRequest) error {
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
accessConditions := azblob.BlobAccessConditions{}
if req.Options.Concurrency == state.FirstWrite && req.ETag != nil {
@ -203,10 +208,43 @@ func (r *StateStore) writeFile(req *state.SetRequest) error {
accessConditions.IfMatch = azblob.ETag(etag)
}
blobURL := r.containerURL.NewBlockBlobURL(getFileName(req.Key))
var blobHTTPHeaders azblob.BlobHTTPHeaders
if val, ok := req.Metadata[contentType]; ok && val != "" {
blobHTTPHeaders.ContentType = val
delete(req.Metadata, contentType)
}
if val, ok := req.Metadata[contentMD5]; ok && val != "" {
sDec, err := b64.StdEncoding.DecodeString(val)
if err != nil || len(sDec) != 16 {
return fmt.Errorf("the MD5 value specified in Content MD5 is invalid, MD5 value must be 128 bits and base64 encoded")
}
blobHTTPHeaders.ContentMD5 = sDec
delete(req.Metadata, contentMD5)
}
if val, ok := req.Metadata[contentEncoding]; ok && val != "" {
blobHTTPHeaders.ContentEncoding = val
delete(req.Metadata, contentEncoding)
}
if val, ok := req.Metadata[contentLanguage]; ok && val != "" {
blobHTTPHeaders.ContentLanguage = val
delete(req.Metadata, contentLanguage)
}
if val, ok := req.Metadata[contentDisposition]; ok && val != "" {
blobHTTPHeaders.ContentDisposition = val
delete(req.Metadata, contentDisposition)
}
if val, ok := req.Metadata[cacheControl]; ok && val != "" {
blobHTTPHeaders.CacheControl = val
delete(req.Metadata, cacheControl)
}
_, err := azblob.UploadBufferToBlockBlob(context.Background(), r.marshal(req), blobURL, azblob.UploadToBlockBlobOptions{
Parallelism: 16,
Metadata: req.Metadata,
AccessConditions: accessConditions,
BlobHTTPHeaders: blobHTTPHeaders,
})
if err != nil {
r.logger.Debugf("write file %s, err %s", req.Key, err)