feat: add dfstore client interface (#1415)
* feat: add dfstore client interface Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
parent
75710ab761
commit
1d7cd9dd12
|
|
@ -71,6 +71,7 @@ const (
|
||||||
DefaultSchedulerPort = 8002
|
DefaultSchedulerPort = 8002
|
||||||
|
|
||||||
DefaultPieceChanSize = 16
|
DefaultPieceChanSize = 16
|
||||||
|
DefaultObjectMaxReplicas = 3
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
|
|
||||||
|
|
@ -22,6 +22,8 @@ const (
|
||||||
HeaderDragonflyTask = "X-Dragonfly-Task"
|
HeaderDragonflyTask = "X-Dragonfly-Task"
|
||||||
HeaderDragonflyRange = "X-Dragonfly-Range"
|
HeaderDragonflyRange = "X-Dragonfly-Range"
|
||||||
HeaderDragonflyBiz = "X-Dragonfly-Biz"
|
HeaderDragonflyBiz = "X-Dragonfly-Biz"
|
||||||
// HeaderDragonflyRegistry is used for dynamic registry mirrors
|
// HeaderDragonflyRegistry is used for dynamic registry mirrors.
|
||||||
HeaderDragonflyRegistry = "X-Dragonfly-Registry"
|
HeaderDragonflyRegistry = "X-Dragonfly-Registry"
|
||||||
|
// HeaderDragonflyObjectMetaDigest is used for digest of object storage.
|
||||||
|
HeaderDragonflyObjectMetaDigest = "X-Dragonfly-Object-Meta-Digest"
|
||||||
)
|
)
|
||||||
|
|
|
||||||
|
|
@ -121,7 +121,7 @@ var peerHostConfig = DaemonOption{
|
||||||
ObjectStorage: ObjectStorageOption{
|
ObjectStorage: ObjectStorageOption{
|
||||||
Enable: false,
|
Enable: false,
|
||||||
Filter: "Expires&Signature&ns",
|
Filter: "Expires&Signature&ns",
|
||||||
MaxReplicas: 3,
|
MaxReplicas: DefaultObjectMaxReplicas,
|
||||||
ListenOption: ListenOption{
|
ListenOption: ListenOption{
|
||||||
Security: SecurityOption{
|
Security: SecurityOption{
|
||||||
Insecure: true,
|
Insecure: true,
|
||||||
|
|
|
||||||
|
|
@ -120,7 +120,7 @@ var peerHostConfig = DaemonOption{
|
||||||
ObjectStorage: ObjectStorageOption{
|
ObjectStorage: ObjectStorageOption{
|
||||||
Enable: false,
|
Enable: false,
|
||||||
Filter: "Expires&Signature&ns",
|
Filter: "Expires&Signature&ns",
|
||||||
MaxReplicas: 3,
|
MaxReplicas: DefaultObjectMaxReplicas,
|
||||||
ListenOption: ListenOption{
|
ListenOption: ListenOption{
|
||||||
Security: SecurityOption{
|
Security: SecurityOption{
|
||||||
Insecure: true,
|
Insecure: true,
|
||||||
|
|
|
||||||
|
|
@ -51,11 +51,11 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
// WriteBack writes the object synchronously to the backend.
|
|
||||||
WriteBack = iota
|
|
||||||
|
|
||||||
// AsyncWriteBack writes the object asynchronously to the backend.
|
// AsyncWriteBack writes the object asynchronously to the backend.
|
||||||
AsyncWriteBack
|
AsyncWriteBack = iota
|
||||||
|
|
||||||
|
// WriteBack writes the object synchronously to the backend.
|
||||||
|
WriteBack
|
||||||
|
|
||||||
// Ephemeral only writes the object to the dfdaemon.
|
// Ephemeral only writes the object to the dfdaemon.
|
||||||
// It is only provided for creating temporary objects between peers,
|
// It is only provided for creating temporary objects between peers,
|
||||||
|
|
@ -157,6 +157,7 @@ func (o *objectStorage) initRouter(cfg *config.DaemonOption, logDir string) *gin
|
||||||
|
|
||||||
// Buckets
|
// Buckets
|
||||||
b := r.Group("/buckets")
|
b := r.Group("/buckets")
|
||||||
|
b.HEAD(":id/objects/*object_key", o.headObject)
|
||||||
b.GET(":id/objects/*object_key", o.getObject)
|
b.GET(":id/objects/*object_key", o.getObject)
|
||||||
b.DELETE(":id/objects/*object_key", o.destroyObject)
|
b.DELETE(":id/objects/*object_key", o.destroyObject)
|
||||||
b.POST(":id/objects", o.createObject)
|
b.POST(":id/objects", o.createObject)
|
||||||
|
|
@ -169,6 +170,49 @@ func (o *objectStorage) getHealth(ctx *gin.Context) {
|
||||||
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
|
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// headObject uses to head object.
|
||||||
|
func (o *objectStorage) headObject(ctx *gin.Context) {
|
||||||
|
var params ObjectParams
|
||||||
|
if err := ctx.ShouldBindUri(¶ms); err != nil {
|
||||||
|
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
bucketName = params.ID
|
||||||
|
objectKey = strings.TrimPrefix(params.ObjectKey, "/")
|
||||||
|
)
|
||||||
|
|
||||||
|
client, err := o.client()
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
meta, isExist, err := client.GetObjectMetadata(ctx, bucketName, objectKey)
|
||||||
|
if err != nil {
|
||||||
|
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !isExist {
|
||||||
|
ctx.JSON(http.StatusNotFound, gin.H{"errors": http.StatusText(http.StatusNotFound)})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx.Header(headers.ContentDisposition, meta.ContentDisposition)
|
||||||
|
ctx.Header(headers.ContentEncoding, meta.ContentEncoding)
|
||||||
|
ctx.Header(headers.ContentLanguage, meta.ContentLanguage)
|
||||||
|
ctx.Header(headers.ContentLength, fmt.Sprint(meta.ContentLength))
|
||||||
|
ctx.Header(headers.ContentType, meta.ContentType)
|
||||||
|
ctx.Header(headers.ETag, meta.ETag)
|
||||||
|
ctx.Header(headers.ETag, meta.ETag)
|
||||||
|
ctx.Header(config.HeaderDragonflyObjectMetaDigest, meta.Digest)
|
||||||
|
|
||||||
|
ctx.Status(http.StatusOK)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// getObject uses to download object data.
|
// getObject uses to download object data.
|
||||||
func (o *objectStorage) getObject(ctx *gin.Context) {
|
func (o *objectStorage) getObject(ctx *gin.Context) {
|
||||||
var params ObjectParams
|
var params ObjectParams
|
||||||
|
|
@ -387,7 +431,7 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
|
||||||
case WriteBack:
|
case WriteBack:
|
||||||
// Import object to seed peer.
|
// Import object to seed peer.
|
||||||
go func() {
|
go func() {
|
||||||
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil {
|
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, urlMeta.Filter, Ephemeral, fileHeader, maxReplicas, log); err != nil {
|
||||||
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
|
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -405,7 +449,7 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
|
||||||
case AsyncWriteBack:
|
case AsyncWriteBack:
|
||||||
// Import object to seed peer.
|
// Import object to seed peer.
|
||||||
go func() {
|
go func() {
|
||||||
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil {
|
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, urlMeta.Filter, Ephemeral, fileHeader, maxReplicas, log); err != nil {
|
||||||
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
|
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
@ -482,7 +526,7 @@ func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID,
|
||||||
}
|
}
|
||||||
|
|
||||||
// importObjectToSeedPeers uses to import object to available seed peers.
|
// importObjectToSeedPeers uses to import object to available seed peers.
|
||||||
func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader, maxReplicas int, log *logger.SugaredLoggerOnWith) error {
|
func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey, filter string, mode int, fileHeader *multipart.FileHeader, maxReplicas int, log *logger.SugaredLoggerOnWith) error {
|
||||||
schedulers, err := o.dynconfig.GetSchedulers()
|
schedulers, err := o.dynconfig.GetSchedulers()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -501,7 +545,7 @@ func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName,
|
||||||
var replicas int
|
var replicas int
|
||||||
for _, seedPeerHost := range seedPeerHosts {
|
for _, seedPeerHost := range seedPeerHosts {
|
||||||
log.Infof("import object %s to seed peer %s", objectKey, seedPeerHost)
|
log.Infof("import object %s to seed peer %s", objectKey, seedPeerHost)
|
||||||
if err := o.importObjectToSeedPeer(ctx, seedPeerHost, bucketName, objectKey, mode, fileHeader); err != nil {
|
if err := o.importObjectToSeedPeer(ctx, seedPeerHost, bucketName, objectKey, filter, mode, fileHeader); err != nil {
|
||||||
log.Errorf("import object %s to seed peer %s failed: %s", objectKey, seedPeerHost, err)
|
log.Errorf("import object %s to seed peer %s failed: %s", objectKey, seedPeerHost, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
@ -517,7 +561,7 @@ func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName,
|
||||||
}
|
}
|
||||||
|
|
||||||
// importObjectToSeedPeer uses to import object to seed peer.
|
// importObjectToSeedPeer uses to import object to seed peer.
|
||||||
func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader) error {
|
func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost, bucketName, objectKey, filter string, mode int, fileHeader *multipart.FileHeader) error {
|
||||||
f, err := fileHeader.Open()
|
f, err := fileHeader.Open()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -535,6 +579,12 @@ func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if filter != "" {
|
||||||
|
if err := writer.WriteField("filter", filter); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
part, err := writer.CreateFormFile("file", fileHeader.Filename)
|
part, err := writer.CreateFormFile("file", fileHeader.Filename)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
|
|
@ -548,13 +598,13 @@ func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
targetURL := url.URL{
|
u := url.URL{
|
||||||
Scheme: "http",
|
Scheme: "http",
|
||||||
Host: seedPeerHost,
|
Host: seedPeerHost,
|
||||||
Path: filepath.Join("buckets", bucketName, "objects"),
|
Path: filepath.Join("buckets", bucketName, "objects"),
|
||||||
}
|
}
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, targetURL.String(), body)
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -566,8 +616,8 @@ func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost
|
||||||
}
|
}
|
||||||
defer resp.Body.Close()
|
defer resp.Body.Close()
|
||||||
|
|
||||||
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusBadRequest {
|
if resp.StatusCode/100 != 2 {
|
||||||
return errors.Errorf("%v: %v", targetURL.String(), resp.Status)
|
return fmt.Errorf("bad response status %s", resp.Status)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,408 @@
|
||||||
|
/*
|
||||||
|
* Copyright 2022 The Dragonfly Authors
|
||||||
|
*
|
||||||
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
|
* you may not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
//go:generate mockgen -destination mocks/dfstore_mock.go -source dfstore.go -package mocks
|
||||||
|
|
||||||
|
package dfstore
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"mime/multipart"
|
||||||
|
"net/http"
|
||||||
|
"net/url"
|
||||||
|
"path/filepath"
|
||||||
|
|
||||||
|
"github.com/go-http-utils/headers"
|
||||||
|
|
||||||
|
"d7y.io/dragonfly/v2/client/daemon/objectstorage"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Dfstore is the interface used for object storage.
|
||||||
|
type Dfstore interface {
|
||||||
|
// GetObjectRequestWithContext returns *http.Request of getting object.
|
||||||
|
GetObjectRequestWithContext(ctx context.Context, input *GetObjectInput) (*http.Request, error)
|
||||||
|
|
||||||
|
// GetObject returns data of object.
|
||||||
|
GetObject(ctx context.Context, input *GetObjectInput) (io.ReadCloser, error)
|
||||||
|
|
||||||
|
// CreateObjectRequestWithContext returns *http.Request of creating object.
|
||||||
|
CreateObjectRequestWithContext(ctx context.Context, input *CreateOjectInput) (*http.Request, error)
|
||||||
|
|
||||||
|
// CreateObject creates data of object.
|
||||||
|
CreateObject(ctx context.Context, input *CreateOjectInput) error
|
||||||
|
|
||||||
|
// DeleteObjectRequestWithContext returns *http.Request of deleting object.
|
||||||
|
DeleteObjectRequestWithContext(ctx context.Context, input *DeleteObjectInput) (*http.Request, error)
|
||||||
|
|
||||||
|
// DeleteObject deletes data of object.
|
||||||
|
DeleteObject(ctx context.Context, input *DeleteObjectInput) error
|
||||||
|
|
||||||
|
// IsObjectExistRequestWithContext returns *http.Request of heading object.
|
||||||
|
IsObjectExistRequestWithContext(ctx context.Context, input *IsObjectExistInput) (*http.Request, error)
|
||||||
|
|
||||||
|
// IsObjectExist returns whether the object exists.
|
||||||
|
IsObjectExist(ctx context.Context, input *IsObjectExistInput) (bool, error)
|
||||||
|
}
|
||||||
|
|
||||||
|
// dfstore provides object storage function.
|
||||||
|
type dfstore struct {
|
||||||
|
endpoint string
|
||||||
|
accessKey string
|
||||||
|
secretKey string
|
||||||
|
httpClient *http.Client
|
||||||
|
}
|
||||||
|
|
||||||
|
// Option is a functional option for configuring the dfstore.
|
||||||
|
type Option func(ds *dfstore)
|
||||||
|
|
||||||
|
// WithHTTPClient set http client for dfstore.
|
||||||
|
func WithHTTPClient(client *http.Client) Option {
|
||||||
|
return func(ds *dfstore) {
|
||||||
|
ds.httpClient = client
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New dfstore instance.
|
||||||
|
func New(endpoint, accessKey, secretKey string, options ...Option) Dfstore {
|
||||||
|
ds := &dfstore{
|
||||||
|
endpoint: endpoint,
|
||||||
|
accessKey: accessKey,
|
||||||
|
secretKey: secretKey,
|
||||||
|
httpClient: http.DefaultClient,
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, opt := range options {
|
||||||
|
opt(ds)
|
||||||
|
}
|
||||||
|
|
||||||
|
return ds
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObjectInput is used to construct request of getting object.
|
||||||
|
type GetObjectInput struct {
|
||||||
|
// BucketName is bucket name.
|
||||||
|
BucketName string
|
||||||
|
|
||||||
|
// ObjectKey is object key.
|
||||||
|
ObjectKey string
|
||||||
|
|
||||||
|
// Filter is used to generate a unique Task ID by
|
||||||
|
// filtering unnecessary query params in the URL,
|
||||||
|
// it is separated by & character.
|
||||||
|
Filter string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate validates GetObjectInput fields.
|
||||||
|
func (i *GetObjectInput) Validate() error {
|
||||||
|
if i.BucketName == "" {
|
||||||
|
return errors.New("invalid BucketName")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if i.ObjectKey == "" {
|
||||||
|
return errors.New("invalid ObjectKey")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObjectRequestWithContext returns *http.Request of getting object.
|
||||||
|
func (ds *dfstore) GetObjectRequestWithContext(ctx context.Context, input *GetObjectInput) (*http.Request, error) {
|
||||||
|
if err := input.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(ds.endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
|
||||||
|
|
||||||
|
query := u.Query()
|
||||||
|
if input.Filter != "" {
|
||||||
|
query.Set("filter", input.Filter)
|
||||||
|
}
|
||||||
|
|
||||||
|
u.RawQuery = query.Encode()
|
||||||
|
|
||||||
|
return http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObject returns data of object.
|
||||||
|
func (ds *dfstore) GetObject(ctx context.Context, input *GetObjectInput) (io.ReadCloser, error) {
|
||||||
|
req, err := ds.GetObjectRequestWithContext(ctx, input)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := ds.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode/100 != 2 {
|
||||||
|
return nil, fmt.Errorf("bad response status %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return resp.Body, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateOjectInput is used to construct request of creating object.
|
||||||
|
type CreateOjectInput struct {
|
||||||
|
// BucketName is bucket name.
|
||||||
|
BucketName string
|
||||||
|
|
||||||
|
// ObjectKey is object key.
|
||||||
|
ObjectKey string
|
||||||
|
|
||||||
|
// Filter is used to generate a unique Task ID by
|
||||||
|
// filtering unnecessary query params in the URL,
|
||||||
|
// it is separated by & character.
|
||||||
|
Filter string
|
||||||
|
|
||||||
|
// Mode is the mode in which the backend is written,
|
||||||
|
// including WriteBack and AsyncWriteBack.
|
||||||
|
Mode int
|
||||||
|
|
||||||
|
// MaxReplicas is the maximum number of
|
||||||
|
// replicas of an object cache in seed peers.
|
||||||
|
MaxReplicas int
|
||||||
|
|
||||||
|
// Reader is reader of object.
|
||||||
|
Reader io.Reader
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate validates CreateOjectInput fields.
|
||||||
|
func (i *CreateOjectInput) Validate() error {
|
||||||
|
if i.BucketName == "" {
|
||||||
|
return errors.New("invalid BucketName")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if i.ObjectKey == "" {
|
||||||
|
return errors.New("invalid ObjectKey")
|
||||||
|
}
|
||||||
|
|
||||||
|
if i.Mode != objectstorage.WriteBack && i.Mode != objectstorage.AsyncWriteBack {
|
||||||
|
return errors.New("invalid Mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
if i.MaxReplicas < 0 || i.MaxReplicas > 100 {
|
||||||
|
return errors.New("invalid MaxReplicas")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObjectRequestWithContext returns *http.Request of creating object.
|
||||||
|
func (ds *dfstore) CreateObjectRequestWithContext(ctx context.Context, input *CreateOjectInput) (*http.Request, error) {
|
||||||
|
if err := input.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
body := &bytes.Buffer{}
|
||||||
|
writer := multipart.NewWriter(body)
|
||||||
|
|
||||||
|
if err := writer.WriteField("key", input.ObjectKey); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// AsyncWriteBack mode is used by default.
|
||||||
|
if err := writer.WriteField("mode", fmt.Sprint(input.Mode)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if input.Filter != "" {
|
||||||
|
if err := writer.WriteField("filter", input.Filter); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if input.MaxReplicas > 0 {
|
||||||
|
if err := writer.WriteField("maxReplicas", fmt.Sprint(input.MaxReplicas)); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
part, err := writer.CreateFormFile("file", filepath.Base(input.ObjectKey))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := io.Copy(part, input.Reader); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(ds.endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.Path = filepath.Join("buckets", input.BucketName, "objects")
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u.String(), body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
req.Header.Add(headers.ContentType, writer.FormDataContentType())
|
||||||
|
|
||||||
|
return req, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObject creates data of object.
|
||||||
|
func (ds *dfstore) CreateObject(ctx context.Context, input *CreateOjectInput) error {
|
||||||
|
req, err := ds.CreateObjectRequestWithContext(ctx, input)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode/100 != 2 {
|
||||||
|
return fmt.Errorf("bad response status %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObjectInput is used to construct request of deleting object.
|
||||||
|
type DeleteObjectInput struct {
|
||||||
|
// BucketName is bucket name.
|
||||||
|
BucketName string
|
||||||
|
|
||||||
|
// ObjectKey is object key.
|
||||||
|
ObjectKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate validates DeleteObjectInput fields.
|
||||||
|
func (i *DeleteObjectInput) Validate() error {
|
||||||
|
if i.BucketName == "" {
|
||||||
|
return errors.New("invalid BucketName")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if i.ObjectKey == "" {
|
||||||
|
return errors.New("invalid ObjectKey")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObjectRequestWithContext returns *http.Request of deleting object.
|
||||||
|
func (ds *dfstore) DeleteObjectRequestWithContext(ctx context.Context, input *DeleteObjectInput) (*http.Request, error) {
|
||||||
|
if err := input.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(ds.endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
|
||||||
|
return http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObject deletes data of object.
|
||||||
|
func (ds *dfstore) DeleteObject(ctx context.Context, input *DeleteObjectInput) error {
|
||||||
|
req, err := ds.DeleteObjectRequestWithContext(ctx, input)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode/100 != 2 {
|
||||||
|
return fmt.Errorf("bad response status %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExistInput is used to construct request of heading object.
|
||||||
|
type IsObjectExistInput struct {
|
||||||
|
// BucketName is bucket name.
|
||||||
|
BucketName string
|
||||||
|
|
||||||
|
// ObjectKey is object key.
|
||||||
|
ObjectKey string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Validate validates IsObjectExistInput fields.
|
||||||
|
func (i *IsObjectExistInput) Validate() error {
|
||||||
|
if i.BucketName == "" {
|
||||||
|
return errors.New("invalid BucketName")
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
if i.ObjectKey == "" {
|
||||||
|
return errors.New("invalid ObjectKey")
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExistRequestWithContext returns *http.Request of heading object.
|
||||||
|
func (ds *dfstore) IsObjectExistRequestWithContext(ctx context.Context, input *IsObjectExistInput) (*http.Request, error) {
|
||||||
|
if err := input.Validate(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u, err := url.Parse(ds.endpoint)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
|
||||||
|
return http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExist returns whether the object exists.
|
||||||
|
func (ds *dfstore) IsObjectExist(ctx context.Context, input *IsObjectExistInput) (bool, error) {
|
||||||
|
req, err := ds.IsObjectExistRequestWithContext(ctx, input)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := http.DefaultClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode/100 != 2 {
|
||||||
|
return false, fmt.Errorf("bad response status %s", resp.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,156 @@
|
||||||
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
|
// Source: dfstore.go
|
||||||
|
|
||||||
|
// Package mocks is a generated GoMock package.
|
||||||
|
package mocks
|
||||||
|
|
||||||
|
import (
|
||||||
|
context "context"
|
||||||
|
io "io"
|
||||||
|
http "net/http"
|
||||||
|
reflect "reflect"
|
||||||
|
|
||||||
|
dfstore "d7y.io/dragonfly/v2/client/dfstore"
|
||||||
|
gomock "github.com/golang/mock/gomock"
|
||||||
|
)
|
||||||
|
|
||||||
|
// MockDfstore is a mock of Dfstore interface.
|
||||||
|
type MockDfstore struct {
|
||||||
|
ctrl *gomock.Controller
|
||||||
|
recorder *MockDfstoreMockRecorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// MockDfstoreMockRecorder is the mock recorder for MockDfstore.
|
||||||
|
type MockDfstoreMockRecorder struct {
|
||||||
|
mock *MockDfstore
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewMockDfstore creates a new mock instance.
|
||||||
|
func NewMockDfstore(ctrl *gomock.Controller) *MockDfstore {
|
||||||
|
mock := &MockDfstore{ctrl: ctrl}
|
||||||
|
mock.recorder = &MockDfstoreMockRecorder{mock}
|
||||||
|
return mock
|
||||||
|
}
|
||||||
|
|
||||||
|
// EXPECT returns an object that allows the caller to indicate expected use.
|
||||||
|
func (m *MockDfstore) EXPECT() *MockDfstoreMockRecorder {
|
||||||
|
return m.recorder
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObject mocks base method.
|
||||||
|
func (m *MockDfstore) CreateObject(ctx context.Context, input *dfstore.CreateOjectInput) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "CreateObject", ctx, input)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObject indicates an expected call of CreateObject.
|
||||||
|
func (mr *MockDfstoreMockRecorder) CreateObject(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateObject", reflect.TypeOf((*MockDfstore)(nil).CreateObject), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObjectRequestWithContext mocks base method.
|
||||||
|
func (m *MockDfstore) CreateObjectRequestWithContext(ctx context.Context, input *dfstore.CreateOjectInput) (*http.Request, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "CreateObjectRequestWithContext", ctx, input)
|
||||||
|
ret0, _ := ret[0].(*http.Request)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateObjectRequestWithContext indicates an expected call of CreateObjectRequestWithContext.
|
||||||
|
func (mr *MockDfstoreMockRecorder) CreateObjectRequestWithContext(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateObjectRequestWithContext", reflect.TypeOf((*MockDfstore)(nil).CreateObjectRequestWithContext), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObject mocks base method.
|
||||||
|
func (m *MockDfstore) DeleteObject(ctx context.Context, input *dfstore.DeleteObjectInput) error {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "DeleteObject", ctx, input)
|
||||||
|
ret0, _ := ret[0].(error)
|
||||||
|
return ret0
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObject indicates an expected call of DeleteObject.
|
||||||
|
func (mr *MockDfstoreMockRecorder) DeleteObject(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockDfstore)(nil).DeleteObject), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObjectRequestWithContext mocks base method.
|
||||||
|
func (m *MockDfstore) DeleteObjectRequestWithContext(ctx context.Context, input *dfstore.DeleteObjectInput) (*http.Request, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "DeleteObjectRequestWithContext", ctx, input)
|
||||||
|
ret0, _ := ret[0].(*http.Request)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteObjectRequestWithContext indicates an expected call of DeleteObjectRequestWithContext.
|
||||||
|
func (mr *MockDfstoreMockRecorder) DeleteObjectRequestWithContext(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObjectRequestWithContext", reflect.TypeOf((*MockDfstore)(nil).DeleteObjectRequestWithContext), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObject mocks base method.
|
||||||
|
func (m *MockDfstore) GetObject(ctx context.Context, input *dfstore.GetObjectInput) (io.ReadCloser, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetObject", ctx, input)
|
||||||
|
ret0, _ := ret[0].(io.ReadCloser)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObject indicates an expected call of GetObject.
|
||||||
|
func (mr *MockDfstoreMockRecorder) GetObject(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObject", reflect.TypeOf((*MockDfstore)(nil).GetObject), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObjectRequestWithContext mocks base method.
|
||||||
|
func (m *MockDfstore) GetObjectRequestWithContext(ctx context.Context, input *dfstore.GetObjectInput) (*http.Request, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "GetObjectRequestWithContext", ctx, input)
|
||||||
|
ret0, _ := ret[0].(*http.Request)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetObjectRequestWithContext indicates an expected call of GetObjectRequestWithContext.
|
||||||
|
func (mr *MockDfstoreMockRecorder) GetObjectRequestWithContext(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetObjectRequestWithContext", reflect.TypeOf((*MockDfstore)(nil).GetObjectRequestWithContext), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExist mocks base method.
|
||||||
|
func (m *MockDfstore) IsObjectExist(ctx context.Context, input *dfstore.IsObjectExistInput) (bool, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "IsObjectExist", ctx, input)
|
||||||
|
ret0, _ := ret[0].(bool)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExist indicates an expected call of IsObjectExist.
|
||||||
|
func (mr *MockDfstoreMockRecorder) IsObjectExist(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsObjectExist", reflect.TypeOf((*MockDfstore)(nil).IsObjectExist), ctx, input)
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExistRequestWithContext mocks base method.
|
||||||
|
func (m *MockDfstore) IsObjectExistRequestWithContext(ctx context.Context, input *dfstore.IsObjectExistInput) (*http.Request, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "IsObjectExistRequestWithContext", ctx, input)
|
||||||
|
ret0, _ := ret[0].(*http.Request)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExistRequestWithContext indicates an expected call of IsObjectExistRequestWithContext.
|
||||||
|
func (mr *MockDfstoreMockRecorder) IsObjectExistRequestWithContext(ctx, input interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsObjectExistRequestWithContext", reflect.TypeOf((*MockDfstore)(nil).IsObjectExistRequestWithContext), ctx, input)
|
||||||
|
}
|
||||||
|
|
@ -109,12 +109,13 @@ func (mr *MockObjectStorageMockRecorder) GetBucketMetadata(ctx, bucketName inter
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectMetadata mocks base method.
|
// GetObjectMetadata mocks base method.
|
||||||
func (m *MockObjectStorage) GetObjectMetadata(ctx context.Context, bucketName, objectKey string) (*objectstorage.ObjectMetadata, error) {
|
func (m *MockObjectStorage) GetObjectMetadata(ctx context.Context, bucketName, objectKey string) (*objectstorage.ObjectMetadata, bool, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
ret := m.ctrl.Call(m, "GetObjectMetadata", ctx, bucketName, objectKey)
|
ret := m.ctrl.Call(m, "GetObjectMetadata", ctx, bucketName, objectKey)
|
||||||
ret0, _ := ret[0].(*objectstorage.ObjectMetadata)
|
ret0, _ := ret[0].(*objectstorage.ObjectMetadata)
|
||||||
ret1, _ := ret[1].(error)
|
ret1, _ := ret[1].(bool)
|
||||||
return ret0, ret1
|
ret2, _ := ret[2].(error)
|
||||||
|
return ret0, ret1, ret2
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetObjectMetadata indicates an expected call of GetObjectMetadata.
|
// GetObjectMetadata indicates an expected call of GetObjectMetadata.
|
||||||
|
|
@ -153,6 +154,21 @@ func (mr *MockObjectStorageMockRecorder) GetSignURL(ctx, bucketName, objectKey,
|
||||||
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSignURL", reflect.TypeOf((*MockObjectStorage)(nil).GetSignURL), ctx, bucketName, objectKey, method, expire)
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetSignURL", reflect.TypeOf((*MockObjectStorage)(nil).GetSignURL), ctx, bucketName, objectKey, method, expire)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IsObjectExist mocks base method.
|
||||||
|
func (m *MockObjectStorage) IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) {
|
||||||
|
m.ctrl.T.Helper()
|
||||||
|
ret := m.ctrl.Call(m, "IsObjectExist", ctx, bucketName, objectKey)
|
||||||
|
ret0, _ := ret[0].(bool)
|
||||||
|
ret1, _ := ret[1].(error)
|
||||||
|
return ret0, ret1
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsObjectExist indicates an expected call of IsObjectExist.
|
||||||
|
func (mr *MockObjectStorageMockRecorder) IsObjectExist(ctx, bucketName, objectKey interface{}) *gomock.Call {
|
||||||
|
mr.mock.ctrl.T.Helper()
|
||||||
|
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsObjectExist", reflect.TypeOf((*MockObjectStorage)(nil).IsObjectExist), ctx, bucketName, objectKey)
|
||||||
|
}
|
||||||
|
|
||||||
// ListBucketMetadatas mocks base method.
|
// ListBucketMetadatas mocks base method.
|
||||||
func (m *MockObjectStorage) ListBucketMetadatas(ctx context.Context) ([]*objectstorage.BucketMetadata, error) {
|
func (m *MockObjectStorage) ListBucketMetadatas(ctx context.Context) ([]*objectstorage.BucketMetadata, error) {
|
||||||
m.ctrl.T.Helper()
|
m.ctrl.T.Helper()
|
||||||
|
|
|
||||||
|
|
@ -44,8 +44,8 @@ type ObjectMetadata struct {
|
||||||
// ContentType is Content-Type header.
|
// ContentType is Content-Type header.
|
||||||
ContentType string
|
ContentType string
|
||||||
|
|
||||||
// Etag is Etag header.
|
// ETag is ETag header.
|
||||||
Etag string
|
ETag string
|
||||||
|
|
||||||
// Digest is object digest.
|
// Digest is object digest.
|
||||||
Digest string
|
Digest string
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
|
||||||
ContentLanguage: header.Get(headers.ContentLanguage),
|
ContentLanguage: header.Get(headers.ContentLanguage),
|
||||||
ContentLength: contentLength,
|
ContentLength: contentLength,
|
||||||
ContentType: header.Get(headers.ContentType),
|
ContentType: header.Get(headers.ContentType),
|
||||||
Etag: header.Get(headers.ETag),
|
ETag: header.Get(headers.ETag),
|
||||||
Digest: header.Get(aliyunoss.HTTPHeaderOssMetaPrefix + MetaDigest),
|
Digest: header.Get(aliyunoss.HTTPHeaderOssMetaPrefix + MetaDigest),
|
||||||
}, true, nil
|
}, true, nil
|
||||||
}
|
}
|
||||||
|
|
@ -167,7 +167,7 @@ func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marke
|
||||||
for _, object := range resp.Objects {
|
for _, object := range resp.Objects {
|
||||||
metadatas = append(metadatas, &ObjectMetadata{
|
metadatas = append(metadatas, &ObjectMetadata{
|
||||||
Key: object.Key,
|
Key: object.Key,
|
||||||
Etag: object.ETag,
|
ETag: object.ETag,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -112,7 +112,7 @@ func (s *s3) GetObjectMetadata(ctx context.Context, bucketName, objectKey string
|
||||||
ContentLanguage: aws.StringValue(resp.ContentLanguage),
|
ContentLanguage: aws.StringValue(resp.ContentLanguage),
|
||||||
ContentLength: aws.Int64Value(resp.ContentLength),
|
ContentLength: aws.Int64Value(resp.ContentLength),
|
||||||
ContentType: aws.StringValue(resp.ContentType),
|
ContentType: aws.StringValue(resp.ContentType),
|
||||||
Etag: aws.StringValue(resp.ETag),
|
ETag: aws.StringValue(resp.ETag),
|
||||||
Digest: aws.StringValue(resp.Metadata[MetaDigest]),
|
Digest: aws.StringValue(resp.Metadata[MetaDigest]),
|
||||||
}, true, nil
|
}, true, nil
|
||||||
}
|
}
|
||||||
|
|
@ -171,7 +171,7 @@ func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker
|
||||||
for _, object := range resp.Contents {
|
for _, object := range resp.Contents {
|
||||||
metadatas = append(metadatas, &ObjectMetadata{
|
metadatas = append(metadatas, &ObjectMetadata{
|
||||||
Key: aws.StringValue(object.Key),
|
Key: aws.StringValue(object.Key),
|
||||||
Etag: aws.StringValue(object.ETag),
|
ETag: aws.StringValue(object.ETag),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,5 +1,5 @@
|
||||||
// Code generated by MockGen. DO NOT EDIT.
|
// Code generated by MockGen. DO NOT EDIT.
|
||||||
// Source: pkg/rpc/manager/client/client.go
|
// Source: client.go
|
||||||
|
|
||||||
// Package mocks is a generated GoMock package.
|
// Package mocks is a generated GoMock package.
|
||||||
package mocks
|
package mocks
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue