From f1fbfdeb815cc185a40ab9dd93edb483668030dc Mon Sep 17 00:00:00 2001 From: tan ding <34300181+XDTD@users.noreply.github.com> Date: Mon, 31 Jul 2023 10:44:21 +0800 Subject: [PATCH] feat: provide support for JuiceFS objectStorage implementation (#2578) --- client/config/headers.go | 6 + client/daemon/objectstorage/objectstorage.go | 112 +++++++- client/daemon/objectstorage/types.go | 29 ++- client/dfstore/dfstore.go | 258 +++++++++++++++++++ pkg/objectstorage/objectstorage.go | 11 +- pkg/objectstorage/obs.go | 47 +++- pkg/objectstorage/oss.go | 41 ++- pkg/objectstorage/s3.go | 43 +++- 8 files changed, 528 insertions(+), 19 deletions(-) diff --git a/client/config/headers.go b/client/config/headers.go index c529c6910..2f3fff6e5 100644 --- a/client/config/headers.go +++ b/client/config/headers.go @@ -31,4 +31,10 @@ const ( HeaderDragonflyRegistry = "X-Dragonfly-Registry" // HeaderDragonflyObjectMetaDigest is used for digest of object storage. HeaderDragonflyObjectMetaDigest = "X-Dragonfly-Object-Meta-Digest" + // HeaderDragonflyObjectMetaLastModifiedTime is used for last modified time of object storage. + HeaderDragonflyObjectMetaLastModifiedTime = "X-Dragonfly-Object-Meta-Last-Modified-Time" + // HeaderDragonflyObjectMetaStorageClass is used for storage class of object storage. + HeaderDragonflyObjectMetaStorageClass = "X-Dragonfly-Object-Meta-Storage-Class" + // HeaderDragonflyObjectOperation is used for object storage operation. + HeaderDragonflyObjectOperation = "X-Dragonfly-Object-Operation" ) diff --git a/client/daemon/objectstorage/objectstorage.go b/client/daemon/objectstorage/objectstorage.go index 28b8b708b..c7f588fa2 100644 --- a/client/daemon/objectstorage/objectstorage.go +++ b/client/daemon/objectstorage/objectstorage.go @@ -172,8 +172,10 @@ func (o *objectStorage) initRouter(cfg *config.DaemonOption, logDir string) *gin // Buckets b := r.Group(RouterGroupBuckets) + b.PUT("/:id", o.createBucket) b.HEAD(":id/objects/*object_key", o.headObject) b.GET(":id/objects/*object_key", o.getObject) + b.GET(":id/objects", o.listObjectMetadatas) b.DELETE(":id/objects/*object_key", o.destroyObject) b.PUT(":id/objects/*object_key", o.putObject) @@ -215,6 +217,8 @@ func (o *objectStorage) headObject(ctx *gin.Context) { return } + lastModifiedTime := meta.LastModifiedTime.Format(http.TimeFormat) + ctx.Header(headers.ContentDisposition, meta.ContentDisposition) ctx.Header(headers.ContentEncoding, meta.ContentEncoding) ctx.Header(headers.ContentLanguage, meta.ContentLanguage) @@ -222,6 +226,8 @@ func (o *objectStorage) headObject(ctx *gin.Context) { ctx.Header(headers.ContentType, meta.ContentType) ctx.Header(headers.ETag, meta.ETag) ctx.Header(config.HeaderDragonflyObjectMetaDigest, meta.Digest) + ctx.Header(config.HeaderDragonflyObjectMetaLastModifiedTime, lastModifiedTime) + ctx.Header(config.HeaderDragonflyObjectMetaStorageClass, meta.StorageClass) ctx.Status(http.StatusOK) return @@ -356,13 +362,20 @@ func (o *objectStorage) destroyObject(ctx *gin.Context) { // putObject uses to upload object data. func (o *objectStorage) putObject(ctx *gin.Context) { + + operation := ctx.Request.Header.Get(config.HeaderDragonflyObjectOperation) + if operation == CopyObject { + o.copyObject(ctx) + return + } + var params ObjectParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } - var form PutObjectRequset + var form PutObjectRequest if err := ctx.ShouldBind(&form); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return @@ -476,6 +489,103 @@ func (o *objectStorage) putObject(ctx *gin.Context) { return } +// createBucket uses to create bucket. +func (o *objectStorage) createBucket(ctx *gin.Context) { + var params BucketParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + client, err := o.client() + if err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) + return + } + + bucketName := params.ID + + logger.Infof("create bucketName %s ", bucketName) + if err := client.CreateBucket(ctx, bucketName); err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) + return + } + + ctx.Status(http.StatusOK) +} + +// listObjectMetadatas uses to list objects meta data. +func (o *objectStorage) listObjectMetadatas(ctx *gin.Context) { + var params BucketParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + var query ListObjectMetadatasQuery + if err := ctx.ShouldBindQuery(&query); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + var ( + bucketName = params.ID + prefix = query.Prefix + marker = query.Marker + delimiter = query.Delimiter + limit = query.Limit + ) + + client, err := o.client() + if err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) + return + } + + metadataList, err := client.ListObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit) + if err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) + return + } + + ctx.JSON(http.StatusOK, metadataList) + ctx.Status(http.StatusOK) +} + +// copyObject uses to copy object. +func (o *objectStorage) copyObject(ctx *gin.Context) { + var params ObjectParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + var form CopyObjectRequest + if err := ctx.ShouldBind(&form); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + var ( + bucketName = params.ID + destination = params.ObjectKey + source = form.Source + ) + + client, err := o.client() + if err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) + return + } + err = client.CopyObject(ctx, bucketName, source, destination) + if err != nil { + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) + return + } + + ctx.Status(http.StatusOK) +} + // getAvailableSeedPeer uses to calculate md5 with file header. func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) *digest.Digest { f, err := fileHeader.Open() diff --git a/client/daemon/objectstorage/types.go b/client/daemon/objectstorage/types.go index 6ed6a6b03..db2788712 100644 --- a/client/daemon/objectstorage/types.go +++ b/client/daemon/objectstorage/types.go @@ -18,12 +18,21 @@ package objectstorage import "mime/multipart" +const ( + PutObject = "put" + CopyObject = "copy" +) + +type BucketParams struct { + ID string `uri:"id" binding:"required"` +} + type ObjectParams struct { ID string `uri:"id" binding:"required"` ObjectKey string `uri:"object_key" binding:"required"` } -type PutObjectRequset struct { +type PutObjectRequest struct { Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"` Filter string `form:"filter" binding:"omitempty"` MaxReplicas int `form:"maxReplicas" binding:"omitempty,gt=0,lte=100"` @@ -33,3 +42,21 @@ type PutObjectRequset struct { type GetObjectQuery struct { Filter string `form:"filter" binding:"omitempty"` } + +type ListObjectMetadatasQuery struct { + // A delimiter is a character used to group keys. + Delimiter string `form:"delimiter" binding:"omitempty"` + + // Marker indicates the starting object key for listing. + Marker string `form:"marker" binding:"omitempty"` + + // Sets the maximum number of keys returned in the response. + Limit int64 `form:"limit" binding:"omitempty"` + + // Limits the response to keys that begin with the specified prefix. + Prefix string `form:"prefix" binding:"omitempty"` +} + +type CopyObjectRequest struct { + Source string `form:"source" binding:"required"` +} diff --git a/client/dfstore/dfstore.go b/client/dfstore/dfstore.go index 907c6c2ba..a2bfe8adb 100644 --- a/client/dfstore/dfstore.go +++ b/client/dfstore/dfstore.go @@ -21,6 +21,7 @@ package dfstore import ( "bytes" "context" + "encoding/json" "errors" "fmt" "io" @@ -29,6 +30,7 @@ import ( "net/url" "path/filepath" "strconv" + "time" "github.com/go-http-utils/headers" @@ -39,6 +41,12 @@ import ( // Dfstore is the interface used for object storage. type Dfstore interface { + // CreateBucketRequestWithContext returns *http.Request of create bucket. + CreateBucketRequestWithContext(ctx context.Context, input *CreateBucketInput) (*http.Request, error) + + // CreateBucket create bucket. + CreateBucketWithContext(ctx context.Context, input *CreateBucketInput) error + // GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata. GetObjectMetadataRequestWithContext(ctx context.Context, input *GetObjectMetadataInput) (*http.Request, error) @@ -51,12 +59,24 @@ type Dfstore interface { // GetObjectWithContext returns data of object. GetObjectWithContext(ctx context.Context, input *GetObjectInput) (io.ReadCloser, error) + // ListObjectMetadatasRequestWithContext returns *http.Request of getting object metadata list. + ListObjectMetadatasRequestWithContext(ctx context.Context, input *ListObjectMetadatasInput) (*http.Request, error) + + // ListObjectMetadatasWithContext returns list of object metadata. + ListObjectMetadatasWithContext(ctx context.Context, input *ListObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) + // PutObjectRequestWithContext returns *http.Request of putting object. PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error) // PutObjectWithContext puts data of object. PutObjectWithContext(ctx context.Context, input *PutObjectInput) error + // CopyObjectRequestWithContext returns *http.Request of copying object. + CopyObjectRequestWithContext(ctx context.Context, input *CopyObjectInput) (*http.Request, error) + + // CopyObjectWithContext copy object from source to destination. + CopyObjectWithContext(ctx context.Context, input *CopyObjectInput) error + // DeleteObjectRequestWithContext returns *http.Request of deleting object. DeleteObjectRequestWithContext(ctx context.Context, input *DeleteObjectInput) (*http.Request, error) @@ -165,6 +185,8 @@ func (dfs *dfstore) GetObjectMetadataWithContext(ctx context.Context, input *Get return nil, err } + lastModifiedTime, _ := time.Parse(http.TimeFormat, resp.Header.Get(config.HeaderDragonflyObjectMetaLastModifiedTime)) + return &pkgobjectstorage.ObjectMetadata{ ContentDisposition: resp.Header.Get(headers.ContentDisposition), ContentEncoding: resp.Header.Get(headers.ContentEncoding), @@ -173,6 +195,8 @@ func (dfs *dfstore) GetObjectMetadataWithContext(ctx context.Context, input *Get ContentType: resp.Header.Get(headers.ContentType), ETag: resp.Header.Get(headers.ContentType), Digest: resp.Header.Get(config.HeaderDragonflyObjectMetaDigest), + LastModifiedTime: lastModifiedTime, + StorageClass: resp.Header.Get(config.HeaderDragonflyObjectMetaStorageClass), }, nil } @@ -257,6 +281,100 @@ func (dfs *dfstore) GetObjectWithContext(ctx context.Context, input *GetObjectIn return resp.Body, nil } +// ListObjectMetadatasInput is used to construct request of getting object metadata list. +type ListObjectMetadatasInput struct { + // BucketName is the bucket name. + BucketName string + + // Prefix filters the objects by their key's prefix. + Prefix string + + // Marker is used for pagination, indicating the object key to start listing from. + Marker string + + // Delimiter is used to create a hierarchical structure, simulating directories in the listing results. + Delimiter string + + // Limit specifies the maximum number of objects to be returned in a single listing request. + Limit int64 +} + +// Validate validates ListObjectMetadatasInput fields. +func (i *ListObjectMetadatasInput) Validate() error { + if i.BucketName == "" { + return errors.New("invalid BucketName") + } + + if i.Limit == 0 { + return errors.New("invalid Limit") + } + + return nil +} + +// ListObjectMetadatasRequestWithContext returns *http.Request of getting object metadata list. +func (dfs *dfstore) ListObjectMetadatasRequestWithContext(ctx context.Context, input *ListObjectMetadatasInput) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + u, err := url.Parse(dfs.endpoint) + if err != nil { + return nil, err + } + + u.Path = filepath.Join("buckets", input.BucketName, "objects") + + query := u.Query() + if input.Prefix != "" { + query.Set("prefix", input.Prefix) + } + + if input.Marker != "" { + query.Set("marker", input.Marker) + } + + if input.Delimiter != "" { + query.Set("delimiter", input.Delimiter) + } + + query.Set("limit", fmt.Sprint(input.Limit)) + + u.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil) + if err != nil { + return nil, err + } + + return req, nil +} + +// ListObjectMetadatasWithContext returns *http.Request of getting object. +func (dfs *dfstore) ListObjectMetadatasWithContext(ctx context.Context, input *ListObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) { + req, err := dfs.ListObjectMetadatasRequestWithContext(ctx, input) + if err != nil { + return nil, err + } + + resp, err := dfs.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return nil, fmt.Errorf("bad response status %s", resp.Status) + } + + var metadataList []*pkgobjectstorage.ObjectMetadata + if err := json.NewDecoder(resp.Body).Decode(&metadataList); err != nil { + return nil, err + } + + return metadataList, nil +} + // PutObjectInput is used to construct request of putting object. type PutObjectInput struct { // BucketName is bucket name. @@ -379,6 +497,146 @@ func (dfs *dfstore) PutObjectWithContext(ctx context.Context, input *PutObjectIn return nil } +// CopyObjectInput is used to construct request of copying object. +type CopyObjectInput struct { + // BucketName is bucket name. + BucketName string + + // SrcObjectKey is the key of object to be copied. + SrcObjectKey string `form:"file" binding:"required"` + + // DestObjectKey is the object key of the destination. + DestObjectKey string `form:"file" binding:"required"` +} + +// Validate validates CopyObjectInput fields. +func (i *CopyObjectInput) Validate() error { + if i.BucketName == "" { + return errors.New("invalid BucketName") + } + + if i.SrcObjectKey == "" { + return errors.New("invalid Source") + } + + if i.DestObjectKey == "" { + return errors.New("invalid Destination") + } + + return nil +} + +// CopyObjectWithContext copy object from source to destination. +func (dfs *dfstore) CopyObjectWithContext(ctx context.Context, input *CopyObjectInput) error { + req, err := dfs.CopyObjectRequestWithContext(ctx, input) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %s", resp.Status) + } + + return nil +} + +// CopyObjectRequestWithContext returns *http.Request of copying object. +func (dfs *dfstore) CopyObjectRequestWithContext(ctx context.Context, input *CopyObjectInput) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + body := &bytes.Buffer{} + writer := multipart.NewWriter(body) + + if err := writer.WriteField("source", input.SrcObjectKey); err != nil { + return nil, err + } + + if err := writer.Close(); err != nil { + return nil, err + } + + u, err := url.Parse(dfs.endpoint) + if err != nil { + return nil, err + } + + u.Path = filepath.Join("buckets", input.BucketName, "objects", input.DestObjectKey) + + query := u.Query() + + u.RawQuery = query.Encode() + + req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body) + if err != nil { + return nil, err + } + req.Header.Add(headers.ContentType, writer.FormDataContentType()) + req.Header.Add(config.HeaderDragonflyObjectOperation, fmt.Sprint(objectstorage.CopyObject)) + return req, nil +} + +// CreateBucketInput is used to construct request of creating bucket. +type CreateBucketInput struct { + // BucketName is bucket name. + BucketName string +} + +// Validate validates CreateBucketInput fields. +func (i *CreateBucketInput) Validate() error { + if i.BucketName == "" { + return errors.New("invalid BucketName") + } + + return nil +} + +// CreateBucketWithContext creates bucket. +func (dfs *dfstore) CreateBucketWithContext(ctx context.Context, input *CreateBucketInput) error { + req, err := dfs.CreateBucketRequestWithContext(ctx, input) + if err != nil { + return err + } + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode/100 != 2 { + return fmt.Errorf("bad response status %s", resp.Status) + } + return err +} + +// CreateBucketRequestWithContext returns *http.Request of creating bucket. +func (dfs *dfstore) CreateBucketRequestWithContext(ctx context.Context, input *CreateBucketInput) (*http.Request, error) { + if err := input.Validate(); err != nil { + return nil, err + } + + u, err := url.Parse(dfs.endpoint) + if err != nil { + return nil, err + } + + u.Path = filepath.Join("buckets", input.BucketName) + + query := u.Query() + + u.RawQuery = query.Encode() + + return http.NewRequestWithContext(ctx, http.MethodPut, u.String(), nil) +} + // DeleteObjectInput is used to construct request of deleting object. type DeleteObjectInput struct { // BucketName is bucket name. diff --git a/pkg/objectstorage/objectstorage.go b/pkg/objectstorage/objectstorage.go index 09ba23644..1be865ca2 100644 --- a/pkg/objectstorage/objectstorage.go +++ b/pkg/objectstorage/objectstorage.go @@ -50,6 +50,12 @@ type ObjectMetadata struct { // Digest is object digest. Digest string + + // LastModifiedTime is last modified time. + LastModifiedTime time.Time + + // StorageClass is object storage class. + StorageClass string } // BucketMetadata provides metadata of bucket. @@ -91,11 +97,14 @@ type ObjectStorage interface { DeleteObject(ctx context.Context, bucketName, objectKey string) error // ListObjectMetadatas returns metadata of objects. - ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) + ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) // IsObjectExist returns whether the object exists. IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) + // CopyObject copy object from source to destination + CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error + // GetSignURL returns sign url of object. GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error) } diff --git a/pkg/objectstorage/obs.go b/pkg/objectstorage/obs.go index 4b6508c2f..42ad075c0 100644 --- a/pkg/objectstorage/obs.go +++ b/pkg/objectstorage/obs.go @@ -26,6 +26,8 @@ import ( huaweiobs "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs" ) +const storageClassStandardIA = "STANDARD_IA" + type obs struct { // OBS client. client *huaweiobs.ObsClient @@ -114,6 +116,8 @@ func (o *obs) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin ContentType: metadata.ContentType, ETag: metadata.ETag, Digest: metadata.Metadata[MetaDigest], + LastModifiedTime: metadata.LastModified, + StorageClass: o.getDefaultStorageClassIfEmpty(metadata.StorageClass), }, true, nil } @@ -157,11 +161,12 @@ func (o *obs) DeleteObject(ctx context.Context, bucketName, objectKey string) er } // ListObjectMetadatas returns metadata of objects. -func (o *obs) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) { +func (o *obs) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { resp, err := o.client.ListObjects(&huaweiobs.ListObjectsInput{ ListObjsInput: huaweiobs.ListObjsInput{ - Prefix: prefix, - MaxKeys: int(limit), + Prefix: prefix, + MaxKeys: int(limit), + Delimiter: delimiter, }, Bucket: bucketName, Marker: marker, @@ -173,14 +178,32 @@ func (o *obs) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marke var metadatas []*ObjectMetadata for _, object := range resp.Contents { metadatas = append(metadatas, &ObjectMetadata{ - Key: object.Key, - ETag: object.ETag, + Key: object.Key, + ETag: object.ETag, + LastModifiedTime: object.LastModified, + StorageClass: o.getDefaultStorageClassIfEmpty(object.StorageClass), }) } return metadatas, nil } +// CopyObject copy object from source to destination. +func (o *obs) CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error { + params := &huaweiobs.CopyObjectInput{ + ObjectOperationInput: huaweiobs.ObjectOperationInput{ + Bucket: bucketName, + Key: srcObjectKey, + }, + CopySourceBucket: bucketName, + CopySourceKey: destObjectKey, + } + + _, err := o.client.CopyObject(params) + + return err +} + // IsObjectExist returns whether the object exists. func (o *obs) IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) { _, isExist, err := o.GetObjectMetadata(ctx, bucketName, objectKey) @@ -225,3 +248,17 @@ func (o *obs) GetSignURL(ctx context.Context, bucketName, objectKey string, meth return resp.SignedUrl, nil } + +// getDefaultStorageClassIfEmpty returns the default storage class if the input is empty. +func (o *obs) getDefaultStorageClassIfEmpty(storageClass huaweiobs.StorageClassType) string { + var sc string + switch storageClass { + case "": + sc = string(huaweiobs.StorageClassStandard) + case huaweiobs.StorageClassWarm: + sc = storageClassStandardIA + default: + sc = string(storageClass) + } + return sc +} diff --git a/pkg/objectstorage/oss.go b/pkg/objectstorage/oss.go index 31d356872..c93647e58 100644 --- a/pkg/objectstorage/oss.go +++ b/pkg/objectstorage/oss.go @@ -29,6 +29,9 @@ import ( "github.com/go-http-utils/headers" ) +// oss limit maxKeys to 1000. +const MaxLimit = 1000 + type oss struct { // OSS client. client *aliyunoss.Client @@ -109,6 +112,9 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin return nil, false, err } + // RFC 1123 format + lastModifiedTime, _ := time.Parse(http.TimeFormat, header.Get(aliyunoss.HTTPHeaderLastModified)) + return &ObjectMetadata{ Key: objectKey, ContentDisposition: header.Get(headers.ContentDisposition), @@ -118,6 +124,8 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin ContentType: header.Get(headers.ContentType), ETag: header.Get(headers.ETag), Digest: header.Get(aliyunoss.HTTPHeaderOssMetaPrefix + MetaDigest), + LastModifiedTime: lastModifiedTime, + StorageClass: o.getDefaultStorageClassIfEmpty(header.Get(aliyunoss.HTTPHeaderOssStorageClass)), }, true, nil } @@ -153,13 +161,17 @@ func (o *oss) DeleteObject(ctx context.Context, bucketName, objectKey string) er } // ListObjectMetadatas returns metadata of objects. -func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) { +func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { bucket, err := o.client.Bucket(bucketName) if err != nil { return nil, err } - resp, err := bucket.ListObjects(aliyunoss.Prefix(prefix), aliyunoss.Marker(marker), aliyunoss.MaxKeys(int(limit))) + if limit > MaxLimit { + limit = MaxLimit + } + + resp, err := bucket.ListObjects(aliyunoss.Prefix(prefix), aliyunoss.Marker(marker), aliyunoss.Delimiter(delimiter), aliyunoss.MaxKeys(int(limit))) if err != nil { return nil, err } @@ -167,8 +179,11 @@ func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marke var metadatas []*ObjectMetadata for _, object := range resp.Objects { metadatas = append(metadatas, &ObjectMetadata{ - Key: object.Key, - ETag: object.ETag, + Key: object.Key, + ETag: object.ETag, + ContentLength: object.Size, + LastModifiedTime: object.LastModified, + StorageClass: o.getDefaultStorageClassIfEmpty(object.StorageClass), }) } @@ -190,6 +205,16 @@ func (o *oss) IsBucketExist(ctx context.Context, bucketName string) (bool, error return o.client.IsBucketExist(bucketName) } +// CopyObject copy object from source to destination. +func (o *oss) CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error { + bucket, err := o.client.Bucket(bucketName) + if err != nil { + return err + } + _, err = bucket.CopyObject(srcObjectKey, destObjectKey) + return err +} + // GetSignURL returns sign url of object. func (o *oss) GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error) { var ossHTTPMethod aliyunoss.HTTPMethod @@ -217,3 +242,11 @@ func (o *oss) GetSignURL(ctx context.Context, bucketName, objectKey string, meth return bucket.SignURL(objectKey, ossHTTPMethod, int64(expire.Seconds())) } + +// getDefaultStorageClassIfEmpty returns the default storage class if the input is empty. +func (o *oss) getDefaultStorageClassIfEmpty(storageClass string) string { + if storageClass == "" { + storageClass = string(aliyunoss.StorageStandard) + } + return storageClass +} diff --git a/pkg/objectstorage/s3.go b/pkg/objectstorage/s3.go index 2baba9f25..5f84fd204 100644 --- a/pkg/objectstorage/s3.go +++ b/pkg/objectstorage/s3.go @@ -35,6 +35,8 @@ type s3 struct { client *awss3.S3 } +const StandardStorageClass = "STANDARD" + // New s3 instance. func newS3(region, endpoint, accessKey, secretKey string, s3ForcePathStyle bool) (ObjectStorage, error) { cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, "")) @@ -114,6 +116,8 @@ func (s *s3) GetObjectMetadata(ctx context.Context, bucketName, objectKey string ContentType: aws.StringValue(resp.ContentType), ETag: aws.StringValue(resp.ETag), Digest: aws.StringValue(resp.Metadata[MetaDigest]), + LastModifiedTime: aws.TimeValue(resp.LastModified), + StorageClass: aws.StringValue(s.getDefaultStorageClassIfEmpty(resp.StorageClass)), }, true, nil } @@ -156,12 +160,13 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectKey string) err } // DeleteObject deletes data of object. -func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) { +func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { resp, err := s.client.ListObjectsWithContext(ctx, &awss3.ListObjectsInput{ - Bucket: aws.String(bucketName), - Prefix: aws.String(prefix), - Marker: aws.String(marker), - MaxKeys: aws.Int64(limit), + Bucket: aws.String(bucketName), + Prefix: aws.String(prefix), + Marker: aws.String(marker), + MaxKeys: aws.Int64(limit), + Delimiter: aws.String(delimiter), }) if err != nil { return nil, err @@ -170,8 +175,11 @@ func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker var metadatas []*ObjectMetadata for _, object := range resp.Contents { metadatas = append(metadatas, &ObjectMetadata{ - Key: aws.StringValue(object.Key), - ETag: aws.StringValue(object.ETag), + Key: aws.StringValue(object.Key), + ContentLength: aws.Int64Value(object.Size), + ETag: aws.StringValue(object.ETag), + LastModifiedTime: aws.TimeValue(object.LastModified), + StorageClass: aws.StringValue(s.getDefaultStorageClassIfEmpty(object.StorageClass)), }) } @@ -208,6 +216,18 @@ func (s *s3) IsBucketExist(ctx context.Context, bucketName string) (bool, error) return true, nil } +// CopyObject copy object from source to destination. +func (s *s3) CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error { + srcObjectKey = bucketName + "/" + srcObjectKey + params := &awss3.CopyObjectInput{ + Bucket: &bucketName, + Key: &destObjectKey, + CopySource: &srcObjectKey, + } + _, err := s.client.CopyObject(params) + return err +} + // GetSignURL returns sign url of object. func (s *s3) GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error) { var req *request.Request @@ -242,3 +262,12 @@ func (s *s3) GetSignURL(ctx context.Context, bucketName, objectKey string, metho return req.Presign(expire) } + +// getDefaultStorageClassIfEmpty returns the default storage class if the input is empty. +func (s *s3) getDefaultStorageClassIfEmpty(storageClass *string) *string { + if storageClass == nil || *storageClass == "" { + defaultStorageClass := StandardStorageClass + return &defaultStorageClass + } + return storageClass +}