feat: copy, move, rename bucket
Signed-off-by: nelson.parente <nelson_parente@live.com.pt>
This commit is contained in:
parent
026f99710a
commit
d6d19be507
|
@ -54,6 +54,9 @@ const (
|
|||
metadataKeyBC = "name"
|
||||
signOperation = "sign"
|
||||
bulkGetOperation = "bulkGet"
|
||||
copyOperation = "copy"
|
||||
renameOperation = "rename"
|
||||
moveOperation = "move"
|
||||
)
|
||||
|
||||
// GCPStorage allows saving data to GCP bucket storage.
|
||||
|
@ -87,6 +90,7 @@ type listPayload struct {
|
|||
MaxResults int32 `json:"maxResults"`
|
||||
Delimiter string `json:"delimiter"`
|
||||
}
|
||||
|
||||
type signResponse struct {
|
||||
SignURL string `json:"signURL"`
|
||||
}
|
||||
|
@ -142,6 +146,9 @@ func (g *GCPStorage) Operations() []bindings.OperationKind {
|
|||
bindings.ListOperation,
|
||||
signOperation,
|
||||
bulkGetOperation,
|
||||
copyOperation,
|
||||
renameOperation,
|
||||
moveOperation,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -161,6 +168,12 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*
|
|||
return g.sign(ctx, req)
|
||||
case bulkGetOperation:
|
||||
return g.bulkGet(ctx, req)
|
||||
case copyOperation:
|
||||
return g.copy(ctx, req)
|
||||
case renameOperation:
|
||||
return g.rename(ctx, req)
|
||||
case moveOperation:
|
||||
return g.move(ctx, req)
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported operation %s", req.Operation)
|
||||
}
|
||||
|
@ -498,3 +511,110 @@ func (g *GCPStorage) bulkGet(ctx context.Context, req *bindings.InvokeRequest) (
|
|||
Data: jsonResponse,
|
||||
}, nil
|
||||
}
|
||||
|
||||
type movePayload struct {
|
||||
DestinationBucket string `json:"destinationBucket"`
|
||||
}
|
||||
|
||||
func (g *GCPStorage) move(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var key string
|
||||
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
||||
key = val
|
||||
} else {
|
||||
return nil, errors.New("gcp bucket binding error: can't read key value")
|
||||
}
|
||||
|
||||
var payload movePayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
return nil, errors.New("gcp bucket binding error: invalid move payload")
|
||||
}
|
||||
|
||||
if payload.DestinationBucket == "" {
|
||||
return nil, errors.New("gcp bucket binding error: required 'destinationBucket' missing")
|
||||
}
|
||||
|
||||
src := g.client.Bucket(g.metadata.Bucket).Object(key)
|
||||
dst := g.client.Bucket(payload.DestinationBucket).Object(key)
|
||||
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
|
||||
return nil, fmt.Errorf("gcp bucket binding error while copying object: %w", err)
|
||||
}
|
||||
|
||||
if err := src.Delete(ctx); err != nil {
|
||||
return nil, fmt.Errorf("gcp bucket binding error while deleting object: %w", err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: []byte(fmt.Sprintf("object %s moved to %s", key, payload.DestinationBucket)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type renamePayload struct {
|
||||
NewName string `json:"newName"`
|
||||
}
|
||||
|
||||
func (g *GCPStorage) rename(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var key string
|
||||
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
||||
key = val
|
||||
} else {
|
||||
return nil, errors.New("gcp bucket binding error: can't read key value")
|
||||
}
|
||||
|
||||
var payload renamePayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
return nil, errors.New("gcp bucket binding error: invalid rename payload")
|
||||
}
|
||||
|
||||
if payload.NewName == "" {
|
||||
return nil, errors.New("gcp bucket binding error: required 'newName' missing")
|
||||
}
|
||||
|
||||
src := g.client.Bucket(g.metadata.Bucket).Object(key)
|
||||
dst := g.client.Bucket(g.metadata.Bucket).Object(payload.NewName)
|
||||
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
|
||||
return nil, fmt.Errorf("gcp bucket binding error while copying object: %w", err)
|
||||
}
|
||||
|
||||
if err := src.Delete(ctx); err != nil {
|
||||
return nil, fmt.Errorf("gcp bucket binding error while deleting object: %w", err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: []byte(fmt.Sprintf("object %s renamed to %s", key, payload.NewName)),
|
||||
}, nil
|
||||
}
|
||||
|
||||
type copyPayload struct {
|
||||
DestinationBucket string `json:"destinationBucket"`
|
||||
}
|
||||
|
||||
func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) {
|
||||
var key string
|
||||
if val, ok := req.Metadata[metadataKey]; ok && val != "" {
|
||||
key = val
|
||||
} else {
|
||||
return nil, errors.New("gcp bucket binding error: can't read key value")
|
||||
}
|
||||
|
||||
var payload copyPayload
|
||||
err := json.Unmarshal(req.Data, &payload)
|
||||
if err != nil {
|
||||
return nil, errors.New("gcp bucket binding error: invalid copy payload")
|
||||
}
|
||||
|
||||
if payload.DestinationBucket == "" {
|
||||
return nil, errors.New("gcp bucket binding error: required 'destinationBucket' missing")
|
||||
}
|
||||
|
||||
src := g.client.Bucket(g.metadata.Bucket).Object(key)
|
||||
dst := g.client.Bucket(payload.DestinationBucket).Object(key)
|
||||
if _, err := dst.CopierFrom(src).Run(ctx); err != nil {
|
||||
return nil, fmt.Errorf("gcp bucket binding error while copying object: %w", err)
|
||||
}
|
||||
|
||||
return &bindings.InvokeResponse{
|
||||
Data: []byte(fmt.Sprintf("object %s copied to %s", key, payload.DestinationBucket)),
|
||||
}, nil
|
||||
}
|
||||
|
|
|
@ -265,3 +265,114 @@ func TestBulkGetOption(t *testing.T) {
|
|||
require.Error(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCopyOption(t *testing.T) {
|
||||
gs := GCPStorage{logger: logger.NewLogger("test")}
|
||||
gs.metadata = &gcpMetadata{}
|
||||
|
||||
t.Run("return error if key is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{}
|
||||
_, err := gs.copy(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: can't read key value", err.Error())
|
||||
})
|
||||
|
||||
t.Run("return error if data is not valid json", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Metadata: map[string]string{
|
||||
"key": "my_key",
|
||||
},
|
||||
}
|
||||
_, err := gs.copy(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: invalid copy payload", err.Error())
|
||||
})
|
||||
|
||||
t.Run("return error if destinationBucket is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Data: []byte(`{}`),
|
||||
Metadata: map[string]string{
|
||||
"key": "my_key",
|
||||
},
|
||||
}
|
||||
_, err := gs.copy(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: required 'destinationBucket' missing", err.Error())
|
||||
})
|
||||
}
|
||||
|
||||
func TestRenameOption(t *testing.T) {
|
||||
gs := GCPStorage{logger: logger.NewLogger("test")}
|
||||
gs.metadata = &gcpMetadata{}
|
||||
|
||||
t.Run("return error if key is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Data: []byte(`{"newName": "my_new_name"}`),
|
||||
}
|
||||
_, err := gs.rename(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: can't read key value", err.Error())
|
||||
})
|
||||
|
||||
t.Run("return error if data is not valid json", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Metadata: map[string]string{
|
||||
"key": "my_key",
|
||||
},
|
||||
}
|
||||
_, err := gs.rename(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: invalid rename payload", err.Error())
|
||||
})
|
||||
|
||||
t.Run("return error if newName is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Data: []byte(`{}`),
|
||||
Metadata: map[string]string{
|
||||
"key": "my_key",
|
||||
},
|
||||
}
|
||||
_, err := gs.rename(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: required 'newName' missing", err.Error())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestMoveOption(t *testing.T) {
|
||||
gs := GCPStorage{logger: logger.NewLogger("test")}
|
||||
gs.metadata = &gcpMetadata{}
|
||||
|
||||
t.Run("return error if key is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Data: []byte(`{"destinationBucket": "my_bucket"}`),
|
||||
}
|
||||
_, err := gs.move(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: can't read key value", err.Error())
|
||||
})
|
||||
|
||||
t.Run("return error if data is not valid json", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Metadata: map[string]string{
|
||||
"key": "my_key",
|
||||
},
|
||||
}
|
||||
_, err := gs.move(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: invalid move payload", err.Error())
|
||||
})
|
||||
|
||||
t.Run("return error if destinationBucket is missing", func(t *testing.T) {
|
||||
r := bindings.InvokeRequest{
|
||||
Data: []byte(`{}`),
|
||||
Metadata: map[string]string{
|
||||
"key": "my_key",
|
||||
},
|
||||
}
|
||||
_, err := gs.move(t.Context(), &r)
|
||||
require.Error(t, err)
|
||||
assert.Equal(t, "gcp bucket binding error: required 'destinationBucket' missing", err.Error())
|
||||
})
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue