372 lines
11 KiB
Go
372 lines
11 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package blobstorage
|
|
|
|
import (
|
|
"context"
|
|
b64 "encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"reflect"
|
|
"strconv"
|
|
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
|
|
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
|
|
"github.com/google/uuid"
|
|
|
|
"github.com/dapr/components-contrib/bindings"
|
|
storageinternal "github.com/dapr/components-contrib/internal/component/azure/blobstorage"
|
|
contribMetadata "github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/kit/logger"
|
|
"github.com/dapr/kit/ptr"
|
|
)
|
|
|
|
const (
|
|
// Used to reference the blob relative to the container.
|
|
metadataKeyBlobName = "blobName"
|
|
// A string value that identifies the portion of the list to be returned with the next list operation.
|
|
// The operation returns a marker value within the response body if the list returned was not complete. The marker
|
|
// value may then be used in a subsequent call to request the next set of list items.
|
|
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs#uri-parameters
|
|
metadataKeyMarker = "marker"
|
|
// The number of blobs that will be returned in a list operation.
|
|
metadataKeyNumber = "number"
|
|
// Defines the response metadata key for the number of pages traversed in a list response.
|
|
metadataKeyPagesTraversed = "pagesTraversed"
|
|
// Defines if the user defined metadata should be returned in the get operation.
|
|
metadataKeyIncludeMetadata = "includeMetadata"
|
|
// Defines the delete snapshots option for the delete operation.
|
|
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/delete-blob#request-headers
|
|
metadataKeyDeleteSnapshots = "deleteSnapshots"
|
|
// Specifies the maximum number of blobs to return, including all BlobPrefix elements. If the request does not
|
|
// specify maxresults the server will return up to 5,000 items.
|
|
// See: https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs#uri-parameters
|
|
maxResults int32 = 5000
|
|
endpointKey = "endpoint"
|
|
)
|
|
|
|
var ErrMissingBlobName = errors.New("blobName is a required attribute")
|
|
|
|
// AzureBlobStorage allows saving blobs to an Azure Blob Storage account.
|
|
type AzureBlobStorage struct {
|
|
metadata *storageinternal.BlobStorageMetadata
|
|
containerClient *container.Client
|
|
|
|
logger logger.Logger
|
|
}
|
|
|
|
type createResponse struct {
|
|
BlobURL string `json:"blobURL"`
|
|
BlobName string `json:"blobName"`
|
|
}
|
|
|
|
type listInclude struct {
|
|
Copy bool `json:"copy"`
|
|
Metadata bool `json:"metadata"`
|
|
Snapshots bool `json:"snapshots"`
|
|
UncommittedBlobs bool `json:"uncommittedBlobs"`
|
|
Deleted bool `json:"deleted"`
|
|
}
|
|
|
|
type listPayload struct {
|
|
Marker string `json:"marker"`
|
|
Prefix string `json:"prefix"`
|
|
MaxResults int32 `json:"maxResults"`
|
|
Include listInclude `json:"include"`
|
|
}
|
|
|
|
// NewAzureBlobStorage returns a new Azure Blob Storage instance.
|
|
func NewAzureBlobStorage(logger logger.Logger) bindings.OutputBinding {
|
|
return &AzureBlobStorage{logger: logger}
|
|
}
|
|
|
|
// Init performs metadata parsing.
|
|
func (a *AzureBlobStorage) Init(ctx context.Context, metadata bindings.Metadata) error {
|
|
var err error
|
|
a.containerClient, a.metadata, err = storageinternal.CreateContainerStorageClient(ctx, a.logger, metadata.Properties)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *AzureBlobStorage) Operations() []bindings.OperationKind {
|
|
return []bindings.OperationKind{
|
|
bindings.CreateOperation,
|
|
bindings.GetOperation,
|
|
bindings.DeleteOperation,
|
|
bindings.ListOperation,
|
|
}
|
|
}
|
|
|
|
func (a *AzureBlobStorage) create(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var blobName string
|
|
if val, ok := req.Metadata[metadataKeyBlobName]; ok && val != "" {
|
|
blobName = val
|
|
delete(req.Metadata, metadataKeyBlobName)
|
|
} else {
|
|
id, err := uuid.NewRandom()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
blobName = id.String()
|
|
}
|
|
|
|
blobHTTPHeaders, err := storageinternal.CreateBlobHTTPHeadersFromRequest(req.Metadata, nil, a.logger)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
d, err := strconv.Unquote(string(req.Data))
|
|
if err == nil {
|
|
req.Data = []byte(d)
|
|
}
|
|
|
|
if a.metadata.DecodeBase64 {
|
|
decoded, decodeError := b64.StdEncoding.DecodeString(string(req.Data))
|
|
if decodeError != nil {
|
|
return nil, decodeError
|
|
}
|
|
req.Data = decoded
|
|
}
|
|
|
|
uploadOptions := azblob.UploadBufferOptions{
|
|
Metadata: storageinternal.SanitizeMetadata(a.logger, req.Metadata),
|
|
HTTPHeaders: &blobHTTPHeaders,
|
|
TransactionalContentMD5: blobHTTPHeaders.BlobContentMD5,
|
|
}
|
|
|
|
blockBlobClient := a.containerClient.NewBlockBlobClient(blobName)
|
|
_, err = blockBlobClient.UploadBuffer(ctx, req.Data, &uploadOptions)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error uploading az blob: %w", err)
|
|
}
|
|
|
|
resp := createResponse{
|
|
BlobURL: blockBlobClient.URL(),
|
|
}
|
|
b, err := json.Marshal(resp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error marshalling create response for azure blob: %w", err)
|
|
}
|
|
|
|
createResponseMetadata := map[string]string{
|
|
"blobName": blobName,
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: b,
|
|
Metadata: createResponseMetadata,
|
|
}, nil
|
|
}
|
|
|
|
func (a *AzureBlobStorage) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var blockBlobClient *blockblob.Client
|
|
if val, ok := req.Metadata[metadataKeyBlobName]; ok && val != "" {
|
|
blockBlobClient = a.containerClient.NewBlockBlobClient(val)
|
|
} else {
|
|
return nil, ErrMissingBlobName
|
|
}
|
|
|
|
downloadOptions := azblob.DownloadStreamOptions{
|
|
AccessConditions: &blob.AccessConditions{},
|
|
}
|
|
|
|
blobDownloadResponse, err := blockBlobClient.DownloadStream(ctx, &downloadOptions)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error downloading az blob: %w", err)
|
|
}
|
|
reader := blobDownloadResponse.Body
|
|
defer reader.Close()
|
|
blobData, err := io.ReadAll(reader)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error reading az blob: %w", err)
|
|
}
|
|
|
|
var metadata map[string]string
|
|
fetchMetadata, err := req.GetMetadataAsBool(metadataKeyIncludeMetadata)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing metadata: %w", err)
|
|
}
|
|
|
|
getPropertiesOptions := blob.GetPropertiesOptions{
|
|
AccessConditions: &blob.AccessConditions{},
|
|
}
|
|
|
|
if fetchMetadata {
|
|
props, err := blockBlobClient.GetProperties(ctx, &getPropertiesOptions)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error reading blob metadata: %w", err)
|
|
}
|
|
|
|
if len(props.Metadata) > 0 {
|
|
metadata = make(map[string]string, len(props.Metadata))
|
|
for k, v := range props.Metadata {
|
|
if v == nil {
|
|
continue
|
|
}
|
|
metadata[k] = *v
|
|
}
|
|
}
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: blobData,
|
|
Metadata: metadata,
|
|
}, nil
|
|
}
|
|
|
|
func (a *AzureBlobStorage) delete(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var blockBlobClient *blockblob.Client
|
|
val, ok := req.Metadata[metadataKeyBlobName]
|
|
if !ok || val == "" {
|
|
return nil, ErrMissingBlobName
|
|
}
|
|
|
|
var deleteSnapshotsOptions blob.DeleteSnapshotsOptionType
|
|
if deleteSnapShotOption, ok := req.Metadata[metadataKeyDeleteSnapshots]; ok && val != "" {
|
|
deleteSnapshotsOptions = azblob.DeleteSnapshotsOptionType(deleteSnapShotOption)
|
|
if !a.isValidDeleteSnapshotsOptionType(deleteSnapshotsOptions) {
|
|
return nil, fmt.Errorf("invalid delete snapshot option type: %s; allowed: %s",
|
|
deleteSnapshotsOptions, azblob.PossibleDeleteSnapshotsOptionTypeValues())
|
|
}
|
|
}
|
|
|
|
deleteOptions := blob.DeleteOptions{
|
|
DeleteSnapshots: &deleteSnapshotsOptions,
|
|
AccessConditions: &blob.AccessConditions{},
|
|
}
|
|
|
|
blockBlobClient = a.containerClient.NewBlockBlobClient(val)
|
|
_, err := blockBlobClient.Delete(ctx, &deleteOptions)
|
|
|
|
return nil, err
|
|
}
|
|
|
|
func (a *AzureBlobStorage) list(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
options := container.ListBlobsFlatOptions{}
|
|
|
|
hasPayload := false
|
|
var payload listPayload
|
|
if req.Data != nil {
|
|
err := json.Unmarshal(req.Data, &payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
hasPayload = true
|
|
}
|
|
if hasPayload {
|
|
options.Include.Copy = payload.Include.Copy
|
|
options.Include.Metadata = payload.Include.Metadata
|
|
options.Include.Snapshots = payload.Include.Snapshots
|
|
options.Include.UncommittedBlobs = payload.Include.UncommittedBlobs
|
|
options.Include.Deleted = payload.Include.Deleted
|
|
}
|
|
|
|
if hasPayload && payload.MaxResults > 0 {
|
|
options.MaxResults = &payload.MaxResults
|
|
} else {
|
|
options.MaxResults = ptr.Of(maxResults) // cannot get address of constant directly
|
|
}
|
|
|
|
if hasPayload && payload.Prefix != "" {
|
|
options.Prefix = &payload.Prefix
|
|
}
|
|
|
|
var initialMarker string
|
|
if hasPayload && payload.Marker != "" {
|
|
initialMarker = payload.Marker
|
|
} else {
|
|
initialMarker = ""
|
|
}
|
|
options.Marker = &initialMarker
|
|
|
|
metadata := make(map[string]string, 3)
|
|
blobs := []*container.BlobItem{}
|
|
pager := a.containerClient.NewListBlobsFlatPager(&options)
|
|
|
|
metadata[metadataKeyMarker] = ""
|
|
numBlobs := 0
|
|
pagesTraversed := 0
|
|
for pager.More() {
|
|
resp, err := pager.NextPage(ctx)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error listing blobs: %w", err)
|
|
}
|
|
pagesTraversed++
|
|
|
|
blobs = append(blobs, resp.Segment.BlobItems...)
|
|
numBlobs += len(resp.Segment.BlobItems)
|
|
if resp.Marker != nil {
|
|
metadata[metadataKeyMarker] = *resp.Marker
|
|
} else {
|
|
metadata[metadataKeyMarker] = ""
|
|
}
|
|
|
|
if numBlobs >= int(*options.MaxResults) {
|
|
break
|
|
}
|
|
}
|
|
metadata[metadataKeyNumber] = strconv.FormatInt(int64(numBlobs), 10)
|
|
metadata[metadataKeyPagesTraversed] = strconv.FormatInt(int64(pagesTraversed), 10)
|
|
|
|
jsonResponse, err := json.Marshal(blobs)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("cannot marshal blobs to json: %w", err)
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: jsonResponse,
|
|
Metadata: metadata,
|
|
}, nil
|
|
}
|
|
|
|
func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
switch req.Operation {
|
|
case bindings.CreateOperation:
|
|
return a.create(ctx, req)
|
|
case bindings.GetOperation:
|
|
return a.get(ctx, req)
|
|
case bindings.DeleteOperation:
|
|
return a.delete(ctx, req)
|
|
case bindings.ListOperation:
|
|
return a.list(ctx, req)
|
|
default:
|
|
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
|
|
}
|
|
}
|
|
|
|
func (a *AzureBlobStorage) isValidDeleteSnapshotsOptionType(accessType azblob.DeleteSnapshotsOptionType) bool {
|
|
validTypes := azblob.PossibleDeleteSnapshotsOptionTypeValues()
|
|
for _, item := range validTypes {
|
|
if item == accessType {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// GetComponentMetadata returns the metadata of the component.
|
|
func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) {
|
|
metadataStruct := storageinternal.BlobStorageMetadata{}
|
|
contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType)
|
|
return
|
|
}
|