341 lines
8.8 KiB
Go
341 lines
8.8 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package obs
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"reflect"
|
|
"strconv"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
|
|
|
|
"github.com/dapr/components-contrib/bindings"
|
|
"github.com/dapr/components-contrib/metadata"
|
|
"github.com/dapr/kit/logger"
|
|
kitmd "github.com/dapr/kit/metadata"
|
|
)
|
|
|
|
const (
|
|
metadataKey = "key"
|
|
maxResults = 1000
|
|
)
|
|
|
|
// add operations that are not listed under the standard bindings operations.
|
|
const (
|
|
UploadOperation bindings.OperationKind = "upload"
|
|
)
|
|
|
|
type HuaweiOBS struct {
|
|
metadata *obsMetadata
|
|
service HuaweiOBSAPI
|
|
logger logger.Logger
|
|
}
|
|
|
|
type obsMetadata struct {
|
|
Region string `json:"region"` // (optional) the specific Huawei region of the bucket
|
|
Endpoint string `json:"endpoint"` // the specific Huawei OBS endpoint
|
|
AccessKey string `json:"accessKey"` // the Huawei Access Key (AK) to access the obs
|
|
SecretKey string `json:"secretKey"` // the Huawei Secret Key (SK) to access the obs
|
|
Bucket string `json:"bucket"` // the name of the Huawei OBS bucket to write to
|
|
}
|
|
|
|
type createResponse struct {
|
|
StatusCode int `json:"statusCode"`
|
|
VersionID string `json:"versionId"`
|
|
}
|
|
|
|
type uploadPayload struct {
|
|
SourceFile string `json:"sourceFile"`
|
|
}
|
|
|
|
type listPayload struct {
|
|
Marker string `json:"marker"`
|
|
Prefix string `json:"prefix"`
|
|
MaxResults int32 `json:"maxResults"`
|
|
Delimiter string `json:"delimiter"`
|
|
}
|
|
|
|
// NewHuaweiOBS returns a new Huawei OBS instance.
|
|
func NewHuaweiOBS(logger logger.Logger) bindings.OutputBinding {
|
|
return &HuaweiOBS{logger: logger}
|
|
}
|
|
|
|
// Init does metadata parsing and connection creation.
|
|
func (o *HuaweiOBS) Init(_ context.Context, metadata bindings.Metadata) error {
|
|
o.logger.Debugf("initializing Huawei OBS binding and parsing metadata")
|
|
|
|
m, err := o.parseMetadata(metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
client, err := obs.New(m.AccessKey, m.SecretKey, m.Endpoint)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
o.metadata = m
|
|
o.service = &HuaweiOBSService{client: client}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) parseMetadata(meta bindings.Metadata) (*obsMetadata, error) {
|
|
var m obsMetadata
|
|
err := kitmd.DecodeMetadata(meta.Properties, &m)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if m.Bucket == "" {
|
|
return nil, fmt.Errorf("missing obs bucket name")
|
|
}
|
|
if m.Endpoint == "" {
|
|
return nil, fmt.Errorf("missing obs endpoint")
|
|
}
|
|
if m.AccessKey == "" {
|
|
return nil, fmt.Errorf("missing the huawei access key")
|
|
}
|
|
if m.SecretKey == "" {
|
|
return nil, fmt.Errorf("missing the huawei secret key")
|
|
}
|
|
|
|
o.logger.Debugf("Huawei OBS metadata=[%s]", m)
|
|
return &m, nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) Operations() []bindings.OperationKind {
|
|
return []bindings.OperationKind{
|
|
bindings.CreateOperation,
|
|
UploadOperation,
|
|
bindings.GetOperation,
|
|
bindings.DeleteOperation,
|
|
bindings.ListOperation,
|
|
}
|
|
}
|
|
|
|
func (o *HuaweiOBS) create(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
d, err := strconv.Unquote(string(req.Data))
|
|
if err == nil {
|
|
req.Data = []byte(d)
|
|
}
|
|
|
|
var key string
|
|
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
|
key = val
|
|
} else {
|
|
key = uuid.New().String()
|
|
o.logger.Debugf("key not found. generating key %s", key)
|
|
}
|
|
|
|
r := bytes.NewReader(req.Data)
|
|
|
|
input := &obs.PutObjectInput{}
|
|
input.Key = key
|
|
input.Bucket = o.metadata.Bucket
|
|
input.Body = r
|
|
|
|
out, err := o.service.PutObject(ctx, input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. putobject: %w", err)
|
|
}
|
|
|
|
jsonResponse, err := json.Marshal(createResponse{
|
|
StatusCode: out.StatusCode,
|
|
VersionID: out.VersionId,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. error marshalling create response: %w", err)
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: jsonResponse,
|
|
}, nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) upload(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var payload uploadPayload
|
|
err := json.Unmarshal(req.Data, &payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var key string
|
|
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
|
key = val
|
|
} else {
|
|
key = uuid.New().String()
|
|
o.logger.Debugf("key not found. generating key %s", key)
|
|
}
|
|
|
|
input := &obs.PutFileInput{}
|
|
input.Key = key
|
|
input.Bucket = o.metadata.Bucket
|
|
input.SourceFile = payload.SourceFile
|
|
|
|
out, err := o.service.PutFile(ctx, input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. putfile: %w", err)
|
|
}
|
|
|
|
jsonResponse, err := json.Marshal(createResponse{
|
|
StatusCode: out.StatusCode,
|
|
VersionID: out.VersionId,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. error marshalling create response: %w", err)
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: jsonResponse,
|
|
}, nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) get(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var key string
|
|
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
|
key = val
|
|
} else {
|
|
return nil, fmt.Errorf("obs binding error: can't read key value")
|
|
}
|
|
|
|
input := &obs.GetObjectInput{}
|
|
input.Bucket = o.metadata.Bucket
|
|
input.Key = key
|
|
|
|
out, err := o.service.GetObject(ctx, input)
|
|
if err != nil {
|
|
var obsErr obs.ObsError
|
|
if errors.As(err, &obsErr) && obsErr.StatusCode == http.StatusNotFound {
|
|
return nil, errors.New("object not found")
|
|
}
|
|
return nil, fmt.Errorf("obs binding error. error getting obs object: %w", err)
|
|
}
|
|
|
|
// close connection at the end of operation
|
|
defer func() {
|
|
err = out.Body.Close()
|
|
if err != nil {
|
|
panic(err)
|
|
}
|
|
}()
|
|
|
|
data, err := io.ReadAll(out.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. error reading obs object content: %w", err)
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: data,
|
|
Metadata: nil,
|
|
}, nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) delete(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var key string
|
|
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
|
key = val
|
|
} else {
|
|
return nil, fmt.Errorf("obs binding error: can't read key value")
|
|
}
|
|
|
|
input := &obs.DeleteObjectInput{}
|
|
input.Bucket = o.metadata.Bucket
|
|
input.Key = key
|
|
|
|
out, err := o.service.DeleteObject(ctx, input)
|
|
if err != nil {
|
|
var obsErr obs.ObsError
|
|
if errors.As(err, &obsErr) && obsErr.StatusCode == http.StatusNotFound {
|
|
return nil, errors.New("object not found")
|
|
}
|
|
return nil, fmt.Errorf("obs binding error. error deleting obs object: %w", err)
|
|
}
|
|
|
|
jsonResponse, err := json.Marshal(createResponse{
|
|
StatusCode: out.StatusCode,
|
|
VersionID: out.VersionId,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. error marshalling create response: %w", err)
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: jsonResponse,
|
|
}, nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) list(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
var payload listPayload
|
|
err := json.Unmarshal(req.Data, &payload)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if payload.MaxResults == int32(0) {
|
|
payload.MaxResults = maxResults
|
|
}
|
|
|
|
input := &obs.ListObjectsInput{}
|
|
|
|
input.Bucket = o.metadata.Bucket
|
|
input.MaxKeys = int(payload.MaxResults)
|
|
input.Marker = payload.Marker
|
|
input.Prefix = payload.Prefix
|
|
input.Delimiter = payload.Delimiter
|
|
|
|
out, err := o.service.ListObjects(ctx, input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. error listing obs objects: %w", err)
|
|
}
|
|
|
|
jsonResponse, err := json.Marshal(out)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("obs binding error. list operation. cannot marshal response to json: %w", err)
|
|
}
|
|
|
|
return &bindings.InvokeResponse{
|
|
Data: jsonResponse,
|
|
}, nil
|
|
}
|
|
|
|
func (o *HuaweiOBS) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
|
switch req.Operation {
|
|
case bindings.CreateOperation:
|
|
return o.create(ctx, req)
|
|
case UploadOperation:
|
|
return o.upload(ctx, req)
|
|
case bindings.GetOperation:
|
|
return o.get(ctx, req)
|
|
case bindings.DeleteOperation:
|
|
return o.delete(ctx, req)
|
|
case bindings.ListOperation:
|
|
return o.list(ctx, req)
|
|
default:
|
|
return nil, fmt.Errorf("obs binding error. unsupported operation %s", req.Operation)
|
|
}
|
|
}
|
|
|
|
// GetComponentMetadata returns the metadata of the component.
|
|
func (o *HuaweiOBS) GetComponentMetadata() (metadataInfo metadata.MetadataMap) {
|
|
metadataStruct := obsMetadata{}
|
|
metadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, metadata.BindingType)
|
|
return
|
|
}
|