feat: provide support for JuiceFS objectStorage implementation (#2578)
This commit is contained in:
parent
cb41c0e454
commit
f1fbfdeb81
|
|
@ -31,4 +31,10 @@ const (
|
|||
HeaderDragonflyRegistry = "X-Dragonfly-Registry"
|
||||
// HeaderDragonflyObjectMetaDigest is used for digest of object storage.
|
||||
HeaderDragonflyObjectMetaDigest = "X-Dragonfly-Object-Meta-Digest"
|
||||
// HeaderDragonflyObjectMetaLastModifiedTime is used for last modified time of object storage.
|
||||
HeaderDragonflyObjectMetaLastModifiedTime = "X-Dragonfly-Object-Meta-Last-Modified-Time"
|
||||
// HeaderDragonflyObjectMetaStorageClass is used for storage class of object storage.
|
||||
HeaderDragonflyObjectMetaStorageClass = "X-Dragonfly-Object-Meta-Storage-Class"
|
||||
// HeaderDragonflyObjectOperation is used for object storage operation.
|
||||
HeaderDragonflyObjectOperation = "X-Dragonfly-Object-Operation"
|
||||
)
|
||||
|
|
|
|||
|
|
@ -172,8 +172,10 @@ func (o *objectStorage) initRouter(cfg *config.DaemonOption, logDir string) *gin
|
|||
|
||||
// Buckets
|
||||
b := r.Group(RouterGroupBuckets)
|
||||
b.PUT("/:id", o.createBucket)
|
||||
b.HEAD(":id/objects/*object_key", o.headObject)
|
||||
b.GET(":id/objects/*object_key", o.getObject)
|
||||
b.GET(":id/objects", o.listObjectMetadatas)
|
||||
b.DELETE(":id/objects/*object_key", o.destroyObject)
|
||||
b.PUT(":id/objects/*object_key", o.putObject)
|
||||
|
||||
|
|
@ -215,6 +217,8 @@ func (o *objectStorage) headObject(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
lastModifiedTime := meta.LastModifiedTime.Format(http.TimeFormat)
|
||||
|
||||
ctx.Header(headers.ContentDisposition, meta.ContentDisposition)
|
||||
ctx.Header(headers.ContentEncoding, meta.ContentEncoding)
|
||||
ctx.Header(headers.ContentLanguage, meta.ContentLanguage)
|
||||
|
|
@ -222,6 +226,8 @@ func (o *objectStorage) headObject(ctx *gin.Context) {
|
|||
ctx.Header(headers.ContentType, meta.ContentType)
|
||||
ctx.Header(headers.ETag, meta.ETag)
|
||||
ctx.Header(config.HeaderDragonflyObjectMetaDigest, meta.Digest)
|
||||
ctx.Header(config.HeaderDragonflyObjectMetaLastModifiedTime, lastModifiedTime)
|
||||
ctx.Header(config.HeaderDragonflyObjectMetaStorageClass, meta.StorageClass)
|
||||
|
||||
ctx.Status(http.StatusOK)
|
||||
return
|
||||
|
|
@ -356,13 +362,20 @@ func (o *objectStorage) destroyObject(ctx *gin.Context) {
|
|||
|
||||
// putObject uses to upload object data.
|
||||
func (o *objectStorage) putObject(ctx *gin.Context) {
|
||||
|
||||
operation := ctx.Request.Header.Get(config.HeaderDragonflyObjectOperation)
|
||||
if operation == CopyObject {
|
||||
o.copyObject(ctx)
|
||||
return
|
||||
}
|
||||
|
||||
var params ObjectParams
|
||||
if err := ctx.ShouldBindUri(¶ms); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
var form PutObjectRequset
|
||||
var form PutObjectRequest
|
||||
if err := ctx.ShouldBind(&form); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
|
|
@ -476,6 +489,103 @@ func (o *objectStorage) putObject(ctx *gin.Context) {
|
|||
return
|
||||
}
|
||||
|
||||
// createBucket uses to create bucket.
|
||||
func (o *objectStorage) createBucket(ctx *gin.Context) {
|
||||
var params BucketParams
|
||||
if err := ctx.ShouldBindUri(¶ms); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
client, err := o.client()
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
bucketName := params.ID
|
||||
|
||||
logger.Infof("create bucketName %s ", bucketName)
|
||||
if err := client.CreateBucket(ctx, bucketName); err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Status(http.StatusOK)
|
||||
}
|
||||
|
||||
// listObjectMetadatas uses to list objects meta data.
|
||||
func (o *objectStorage) listObjectMetadatas(ctx *gin.Context) {
|
||||
var params BucketParams
|
||||
if err := ctx.ShouldBindUri(¶ms); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
var query ListObjectMetadatasQuery
|
||||
if err := ctx.ShouldBindQuery(&query); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
bucketName = params.ID
|
||||
prefix = query.Prefix
|
||||
marker = query.Marker
|
||||
delimiter = query.Delimiter
|
||||
limit = query.Limit
|
||||
)
|
||||
|
||||
client, err := o.client()
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
metadataList, err := client.ListObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.JSON(http.StatusOK, metadataList)
|
||||
ctx.Status(http.StatusOK)
|
||||
}
|
||||
|
||||
// copyObject uses to copy object.
|
||||
func (o *objectStorage) copyObject(ctx *gin.Context) {
|
||||
var params ObjectParams
|
||||
if err := ctx.ShouldBindUri(¶ms); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
var form CopyObjectRequest
|
||||
if err := ctx.ShouldBind(&form); err != nil {
|
||||
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
var (
|
||||
bucketName = params.ID
|
||||
destination = params.ObjectKey
|
||||
source = form.Source
|
||||
)
|
||||
|
||||
client, err := o.client()
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
err = client.CopyObject(ctx, bucketName, source, destination)
|
||||
if err != nil {
|
||||
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
|
||||
return
|
||||
}
|
||||
|
||||
ctx.Status(http.StatusOK)
|
||||
}
|
||||
|
||||
// getAvailableSeedPeer uses to calculate md5 with file header.
|
||||
func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) *digest.Digest {
|
||||
f, err := fileHeader.Open()
|
||||
|
|
|
|||
|
|
@ -18,12 +18,21 @@ package objectstorage
|
|||
|
||||
import "mime/multipart"
|
||||
|
||||
const (
|
||||
PutObject = "put"
|
||||
CopyObject = "copy"
|
||||
)
|
||||
|
||||
type BucketParams struct {
|
||||
ID string `uri:"id" binding:"required"`
|
||||
}
|
||||
|
||||
type ObjectParams struct {
|
||||
ID string `uri:"id" binding:"required"`
|
||||
ObjectKey string `uri:"object_key" binding:"required"`
|
||||
}
|
||||
|
||||
type PutObjectRequset struct {
|
||||
type PutObjectRequest struct {
|
||||
Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"`
|
||||
Filter string `form:"filter" binding:"omitempty"`
|
||||
MaxReplicas int `form:"maxReplicas" binding:"omitempty,gt=0,lte=100"`
|
||||
|
|
@ -33,3 +42,21 @@ type PutObjectRequset struct {
|
|||
type GetObjectQuery struct {
|
||||
Filter string `form:"filter" binding:"omitempty"`
|
||||
}
|
||||
|
||||
type ListObjectMetadatasQuery struct {
|
||||
// A delimiter is a character used to group keys.
|
||||
Delimiter string `form:"delimiter" binding:"omitempty"`
|
||||
|
||||
// Marker indicates the starting object key for listing.
|
||||
Marker string `form:"marker" binding:"omitempty"`
|
||||
|
||||
// Sets the maximum number of keys returned in the response.
|
||||
Limit int64 `form:"limit" binding:"omitempty"`
|
||||
|
||||
// Limits the response to keys that begin with the specified prefix.
|
||||
Prefix string `form:"prefix" binding:"omitempty"`
|
||||
}
|
||||
|
||||
type CopyObjectRequest struct {
|
||||
Source string `form:"source" binding:"required"`
|
||||
}
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ package dfstore
|
|||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
|
|
@ -29,6 +30,7 @@ import (
|
|||
"net/url"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-http-utils/headers"
|
||||
|
||||
|
|
@ -39,6 +41,12 @@ import (
|
|||
|
||||
// Dfstore is the interface used for object storage.
|
||||
type Dfstore interface {
|
||||
// CreateBucketRequestWithContext returns *http.Request of create bucket.
|
||||
CreateBucketRequestWithContext(ctx context.Context, input *CreateBucketInput) (*http.Request, error)
|
||||
|
||||
// CreateBucket create bucket.
|
||||
CreateBucketWithContext(ctx context.Context, input *CreateBucketInput) error
|
||||
|
||||
// GetObjectMetadataRequestWithContext returns *http.Request of getting object metadata.
|
||||
GetObjectMetadataRequestWithContext(ctx context.Context, input *GetObjectMetadataInput) (*http.Request, error)
|
||||
|
||||
|
|
@ -51,12 +59,24 @@ type Dfstore interface {
|
|||
// GetObjectWithContext returns data of object.
|
||||
GetObjectWithContext(ctx context.Context, input *GetObjectInput) (io.ReadCloser, error)
|
||||
|
||||
// ListObjectMetadatasRequestWithContext returns *http.Request of getting object metadata list.
|
||||
ListObjectMetadatasRequestWithContext(ctx context.Context, input *ListObjectMetadatasInput) (*http.Request, error)
|
||||
|
||||
// ListObjectMetadatasWithContext returns list of object metadata.
|
||||
ListObjectMetadatasWithContext(ctx context.Context, input *ListObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error)
|
||||
|
||||
// PutObjectRequestWithContext returns *http.Request of putting object.
|
||||
PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error)
|
||||
|
||||
// PutObjectWithContext puts data of object.
|
||||
PutObjectWithContext(ctx context.Context, input *PutObjectInput) error
|
||||
|
||||
// CopyObjectRequestWithContext returns *http.Request of copying object.
|
||||
CopyObjectRequestWithContext(ctx context.Context, input *CopyObjectInput) (*http.Request, error)
|
||||
|
||||
// CopyObjectWithContext copy object from source to destination.
|
||||
CopyObjectWithContext(ctx context.Context, input *CopyObjectInput) error
|
||||
|
||||
// DeleteObjectRequestWithContext returns *http.Request of deleting object.
|
||||
DeleteObjectRequestWithContext(ctx context.Context, input *DeleteObjectInput) (*http.Request, error)
|
||||
|
||||
|
|
@ -165,6 +185,8 @@ func (dfs *dfstore) GetObjectMetadataWithContext(ctx context.Context, input *Get
|
|||
return nil, err
|
||||
}
|
||||
|
||||
lastModifiedTime, _ := time.Parse(http.TimeFormat, resp.Header.Get(config.HeaderDragonflyObjectMetaLastModifiedTime))
|
||||
|
||||
return &pkgobjectstorage.ObjectMetadata{
|
||||
ContentDisposition: resp.Header.Get(headers.ContentDisposition),
|
||||
ContentEncoding: resp.Header.Get(headers.ContentEncoding),
|
||||
|
|
@ -173,6 +195,8 @@ func (dfs *dfstore) GetObjectMetadataWithContext(ctx context.Context, input *Get
|
|||
ContentType: resp.Header.Get(headers.ContentType),
|
||||
ETag: resp.Header.Get(headers.ContentType),
|
||||
Digest: resp.Header.Get(config.HeaderDragonflyObjectMetaDigest),
|
||||
LastModifiedTime: lastModifiedTime,
|
||||
StorageClass: resp.Header.Get(config.HeaderDragonflyObjectMetaStorageClass),
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
@ -257,6 +281,100 @@ func (dfs *dfstore) GetObjectWithContext(ctx context.Context, input *GetObjectIn
|
|||
return resp.Body, nil
|
||||
}
|
||||
|
||||
// ListObjectMetadatasInput is used to construct request of getting object metadata list.
|
||||
type ListObjectMetadatasInput struct {
|
||||
// BucketName is the bucket name.
|
||||
BucketName string
|
||||
|
||||
// Prefix filters the objects by their key's prefix.
|
||||
Prefix string
|
||||
|
||||
// Marker is used for pagination, indicating the object key to start listing from.
|
||||
Marker string
|
||||
|
||||
// Delimiter is used to create a hierarchical structure, simulating directories in the listing results.
|
||||
Delimiter string
|
||||
|
||||
// Limit specifies the maximum number of objects to be returned in a single listing request.
|
||||
Limit int64
|
||||
}
|
||||
|
||||
// Validate validates ListObjectMetadatasInput fields.
|
||||
func (i *ListObjectMetadatasInput) Validate() error {
|
||||
if i.BucketName == "" {
|
||||
return errors.New("invalid BucketName")
|
||||
}
|
||||
|
||||
if i.Limit == 0 {
|
||||
return errors.New("invalid Limit")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// ListObjectMetadatasRequestWithContext returns *http.Request of getting object metadata list.
|
||||
func (dfs *dfstore) ListObjectMetadatasRequestWithContext(ctx context.Context, input *ListObjectMetadatasInput) (*http.Request, error) {
|
||||
if err := input.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := url.Parse(dfs.endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u.Path = filepath.Join("buckets", input.BucketName, "objects")
|
||||
|
||||
query := u.Query()
|
||||
if input.Prefix != "" {
|
||||
query.Set("prefix", input.Prefix)
|
||||
}
|
||||
|
||||
if input.Marker != "" {
|
||||
query.Set("marker", input.Marker)
|
||||
}
|
||||
|
||||
if input.Delimiter != "" {
|
||||
query.Set("delimiter", input.Delimiter)
|
||||
}
|
||||
|
||||
query.Set("limit", fmt.Sprint(input.Limit))
|
||||
|
||||
u.RawQuery = query.Encode()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, u.String(), nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// ListObjectMetadatasWithContext returns *http.Request of getting object.
|
||||
func (dfs *dfstore) ListObjectMetadatasWithContext(ctx context.Context, input *ListObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) {
|
||||
req, err := dfs.ListObjectMetadatasRequestWithContext(ctx, input)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := dfs.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return nil, fmt.Errorf("bad response status %s", resp.Status)
|
||||
}
|
||||
|
||||
var metadataList []*pkgobjectstorage.ObjectMetadata
|
||||
if err := json.NewDecoder(resp.Body).Decode(&metadataList); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return metadataList, nil
|
||||
}
|
||||
|
||||
// PutObjectInput is used to construct request of putting object.
|
||||
type PutObjectInput struct {
|
||||
// BucketName is bucket name.
|
||||
|
|
@ -379,6 +497,146 @@ func (dfs *dfstore) PutObjectWithContext(ctx context.Context, input *PutObjectIn
|
|||
return nil
|
||||
}
|
||||
|
||||
// CopyObjectInput is used to construct request of copying object.
|
||||
type CopyObjectInput struct {
|
||||
// BucketName is bucket name.
|
||||
BucketName string
|
||||
|
||||
// SrcObjectKey is the key of object to be copied.
|
||||
SrcObjectKey string `form:"file" binding:"required"`
|
||||
|
||||
// DestObjectKey is the object key of the destination.
|
||||
DestObjectKey string `form:"file" binding:"required"`
|
||||
}
|
||||
|
||||
// Validate validates CopyObjectInput fields.
|
||||
func (i *CopyObjectInput) Validate() error {
|
||||
if i.BucketName == "" {
|
||||
return errors.New("invalid BucketName")
|
||||
}
|
||||
|
||||
if i.SrcObjectKey == "" {
|
||||
return errors.New("invalid Source")
|
||||
}
|
||||
|
||||
if i.DestObjectKey == "" {
|
||||
return errors.New("invalid Destination")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CopyObjectWithContext copy object from source to destination.
|
||||
func (dfs *dfstore) CopyObjectWithContext(ctx context.Context, input *CopyObjectInput) error {
|
||||
req, err := dfs.CopyObjectRequestWithContext(ctx, input)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %s", resp.Status)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CopyObjectRequestWithContext returns *http.Request of copying object.
|
||||
func (dfs *dfstore) CopyObjectRequestWithContext(ctx context.Context, input *CopyObjectInput) (*http.Request, error) {
|
||||
if err := input.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
body := &bytes.Buffer{}
|
||||
writer := multipart.NewWriter(body)
|
||||
|
||||
if err := writer.WriteField("source", input.SrcObjectKey); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := writer.Close(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := url.Parse(dfs.endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.DestObjectKey)
|
||||
|
||||
query := u.Query()
|
||||
|
||||
u.RawQuery = query.Encode()
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Header.Add(headers.ContentType, writer.FormDataContentType())
|
||||
req.Header.Add(config.HeaderDragonflyObjectOperation, fmt.Sprint(objectstorage.CopyObject))
|
||||
return req, nil
|
||||
}
|
||||
|
||||
// CreateBucketInput is used to construct request of creating bucket.
|
||||
type CreateBucketInput struct {
|
||||
// BucketName is bucket name.
|
||||
BucketName string
|
||||
}
|
||||
|
||||
// Validate validates CreateBucketInput fields.
|
||||
func (i *CreateBucketInput) Validate() error {
|
||||
if i.BucketName == "" {
|
||||
return errors.New("invalid BucketName")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// CreateBucketWithContext creates bucket.
|
||||
func (dfs *dfstore) CreateBucketWithContext(ctx context.Context, input *CreateBucketInput) error {
|
||||
req, err := dfs.CreateBucketRequestWithContext(ctx, input)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := http.DefaultClient.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode/100 != 2 {
|
||||
return fmt.Errorf("bad response status %s", resp.Status)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// CreateBucketRequestWithContext returns *http.Request of creating bucket.
|
||||
func (dfs *dfstore) CreateBucketRequestWithContext(ctx context.Context, input *CreateBucketInput) (*http.Request, error) {
|
||||
if err := input.Validate(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u, err := url.Parse(dfs.endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
u.Path = filepath.Join("buckets", input.BucketName)
|
||||
|
||||
query := u.Query()
|
||||
|
||||
u.RawQuery = query.Encode()
|
||||
|
||||
return http.NewRequestWithContext(ctx, http.MethodPut, u.String(), nil)
|
||||
}
|
||||
|
||||
// DeleteObjectInput is used to construct request of deleting object.
|
||||
type DeleteObjectInput struct {
|
||||
// BucketName is bucket name.
|
||||
|
|
|
|||
|
|
@ -50,6 +50,12 @@ type ObjectMetadata struct {
|
|||
|
||||
// Digest is object digest.
|
||||
Digest string
|
||||
|
||||
// LastModifiedTime is last modified time.
|
||||
LastModifiedTime time.Time
|
||||
|
||||
// StorageClass is object storage class.
|
||||
StorageClass string
|
||||
}
|
||||
|
||||
// BucketMetadata provides metadata of bucket.
|
||||
|
|
@ -91,11 +97,14 @@ type ObjectStorage interface {
|
|||
DeleteObject(ctx context.Context, bucketName, objectKey string) error
|
||||
|
||||
// ListObjectMetadatas returns metadata of objects.
|
||||
ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error)
|
||||
ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error)
|
||||
|
||||
// IsObjectExist returns whether the object exists.
|
||||
IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error)
|
||||
|
||||
// CopyObject copy object from source to destination
|
||||
CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error
|
||||
|
||||
// GetSignURL returns sign url of object.
|
||||
GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -26,6 +26,8 @@ import (
|
|||
huaweiobs "github.com/huaweicloud/huaweicloud-sdk-go-obs/obs"
|
||||
)
|
||||
|
||||
const storageClassStandardIA = "STANDARD_IA"
|
||||
|
||||
type obs struct {
|
||||
// OBS client.
|
||||
client *huaweiobs.ObsClient
|
||||
|
|
@ -114,6 +116,8 @@ func (o *obs) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
|
|||
ContentType: metadata.ContentType,
|
||||
ETag: metadata.ETag,
|
||||
Digest: metadata.Metadata[MetaDigest],
|
||||
LastModifiedTime: metadata.LastModified,
|
||||
StorageClass: o.getDefaultStorageClassIfEmpty(metadata.StorageClass),
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
|
|
@ -157,11 +161,12 @@ func (o *obs) DeleteObject(ctx context.Context, bucketName, objectKey string) er
|
|||
}
|
||||
|
||||
// ListObjectMetadatas returns metadata of objects.
|
||||
func (o *obs) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) {
|
||||
func (o *obs) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) {
|
||||
resp, err := o.client.ListObjects(&huaweiobs.ListObjectsInput{
|
||||
ListObjsInput: huaweiobs.ListObjsInput{
|
||||
Prefix: prefix,
|
||||
MaxKeys: int(limit),
|
||||
Prefix: prefix,
|
||||
MaxKeys: int(limit),
|
||||
Delimiter: delimiter,
|
||||
},
|
||||
Bucket: bucketName,
|
||||
Marker: marker,
|
||||
|
|
@ -173,14 +178,32 @@ func (o *obs) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marke
|
|||
var metadatas []*ObjectMetadata
|
||||
for _, object := range resp.Contents {
|
||||
metadatas = append(metadatas, &ObjectMetadata{
|
||||
Key: object.Key,
|
||||
ETag: object.ETag,
|
||||
Key: object.Key,
|
||||
ETag: object.ETag,
|
||||
LastModifiedTime: object.LastModified,
|
||||
StorageClass: o.getDefaultStorageClassIfEmpty(object.StorageClass),
|
||||
})
|
||||
}
|
||||
|
||||
return metadatas, nil
|
||||
}
|
||||
|
||||
// CopyObject copy object from source to destination.
|
||||
func (o *obs) CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error {
|
||||
params := &huaweiobs.CopyObjectInput{
|
||||
ObjectOperationInput: huaweiobs.ObjectOperationInput{
|
||||
Bucket: bucketName,
|
||||
Key: srcObjectKey,
|
||||
},
|
||||
CopySourceBucket: bucketName,
|
||||
CopySourceKey: destObjectKey,
|
||||
}
|
||||
|
||||
_, err := o.client.CopyObject(params)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// IsObjectExist returns whether the object exists.
|
||||
func (o *obs) IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) {
|
||||
_, isExist, err := o.GetObjectMetadata(ctx, bucketName, objectKey)
|
||||
|
|
@ -225,3 +248,17 @@ func (o *obs) GetSignURL(ctx context.Context, bucketName, objectKey string, meth
|
|||
|
||||
return resp.SignedUrl, nil
|
||||
}
|
||||
|
||||
// getDefaultStorageClassIfEmpty returns the default storage class if the input is empty.
|
||||
func (o *obs) getDefaultStorageClassIfEmpty(storageClass huaweiobs.StorageClassType) string {
|
||||
var sc string
|
||||
switch storageClass {
|
||||
case "":
|
||||
sc = string(huaweiobs.StorageClassStandard)
|
||||
case huaweiobs.StorageClassWarm:
|
||||
sc = storageClassStandardIA
|
||||
default:
|
||||
sc = string(storageClass)
|
||||
}
|
||||
return sc
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,9 @@ import (
|
|||
"github.com/go-http-utils/headers"
|
||||
)
|
||||
|
||||
// oss limit maxKeys to 1000.
|
||||
const MaxLimit = 1000
|
||||
|
||||
type oss struct {
|
||||
// OSS client.
|
||||
client *aliyunoss.Client
|
||||
|
|
@ -109,6 +112,9 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
|
|||
return nil, false, err
|
||||
}
|
||||
|
||||
// RFC 1123 format
|
||||
lastModifiedTime, _ := time.Parse(http.TimeFormat, header.Get(aliyunoss.HTTPHeaderLastModified))
|
||||
|
||||
return &ObjectMetadata{
|
||||
Key: objectKey,
|
||||
ContentDisposition: header.Get(headers.ContentDisposition),
|
||||
|
|
@ -118,6 +124,8 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
|
|||
ContentType: header.Get(headers.ContentType),
|
||||
ETag: header.Get(headers.ETag),
|
||||
Digest: header.Get(aliyunoss.HTTPHeaderOssMetaPrefix + MetaDigest),
|
||||
LastModifiedTime: lastModifiedTime,
|
||||
StorageClass: o.getDefaultStorageClassIfEmpty(header.Get(aliyunoss.HTTPHeaderOssStorageClass)),
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
|
|
@ -153,13 +161,17 @@ func (o *oss) DeleteObject(ctx context.Context, bucketName, objectKey string) er
|
|||
}
|
||||
|
||||
// ListObjectMetadatas returns metadata of objects.
|
||||
func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) {
|
||||
func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) {
|
||||
bucket, err := o.client.Bucket(bucketName)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp, err := bucket.ListObjects(aliyunoss.Prefix(prefix), aliyunoss.Marker(marker), aliyunoss.MaxKeys(int(limit)))
|
||||
if limit > MaxLimit {
|
||||
limit = MaxLimit
|
||||
}
|
||||
|
||||
resp, err := bucket.ListObjects(aliyunoss.Prefix(prefix), aliyunoss.Marker(marker), aliyunoss.Delimiter(delimiter), aliyunoss.MaxKeys(int(limit)))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
|
@ -167,8 +179,11 @@ func (o *oss) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marke
|
|||
var metadatas []*ObjectMetadata
|
||||
for _, object := range resp.Objects {
|
||||
metadatas = append(metadatas, &ObjectMetadata{
|
||||
Key: object.Key,
|
||||
ETag: object.ETag,
|
||||
Key: object.Key,
|
||||
ETag: object.ETag,
|
||||
ContentLength: object.Size,
|
||||
LastModifiedTime: object.LastModified,
|
||||
StorageClass: o.getDefaultStorageClassIfEmpty(object.StorageClass),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -190,6 +205,16 @@ func (o *oss) IsBucketExist(ctx context.Context, bucketName string) (bool, error
|
|||
return o.client.IsBucketExist(bucketName)
|
||||
}
|
||||
|
||||
// CopyObject copy object from source to destination.
|
||||
func (o *oss) CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error {
|
||||
bucket, err := o.client.Bucket(bucketName)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, err = bucket.CopyObject(srcObjectKey, destObjectKey)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetSignURL returns sign url of object.
|
||||
func (o *oss) GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error) {
|
||||
var ossHTTPMethod aliyunoss.HTTPMethod
|
||||
|
|
@ -217,3 +242,11 @@ func (o *oss) GetSignURL(ctx context.Context, bucketName, objectKey string, meth
|
|||
|
||||
return bucket.SignURL(objectKey, ossHTTPMethod, int64(expire.Seconds()))
|
||||
}
|
||||
|
||||
// getDefaultStorageClassIfEmpty returns the default storage class if the input is empty.
|
||||
func (o *oss) getDefaultStorageClassIfEmpty(storageClass string) string {
|
||||
if storageClass == "" {
|
||||
storageClass = string(aliyunoss.StorageStandard)
|
||||
}
|
||||
return storageClass
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,6 +35,8 @@ type s3 struct {
|
|||
client *awss3.S3
|
||||
}
|
||||
|
||||
const StandardStorageClass = "STANDARD"
|
||||
|
||||
// New s3 instance.
|
||||
func newS3(region, endpoint, accessKey, secretKey string, s3ForcePathStyle bool) (ObjectStorage, error) {
|
||||
cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, ""))
|
||||
|
|
@ -114,6 +116,8 @@ func (s *s3) GetObjectMetadata(ctx context.Context, bucketName, objectKey string
|
|||
ContentType: aws.StringValue(resp.ContentType),
|
||||
ETag: aws.StringValue(resp.ETag),
|
||||
Digest: aws.StringValue(resp.Metadata[MetaDigest]),
|
||||
LastModifiedTime: aws.TimeValue(resp.LastModified),
|
||||
StorageClass: aws.StringValue(s.getDefaultStorageClassIfEmpty(resp.StorageClass)),
|
||||
}, true, nil
|
||||
}
|
||||
|
||||
|
|
@ -156,12 +160,13 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectKey string) err
|
|||
}
|
||||
|
||||
// DeleteObject deletes data of object.
|
||||
func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker string, limit int64) ([]*ObjectMetadata, error) {
|
||||
func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) {
|
||||
resp, err := s.client.ListObjectsWithContext(ctx, &awss3.ListObjectsInput{
|
||||
Bucket: aws.String(bucketName),
|
||||
Prefix: aws.String(prefix),
|
||||
Marker: aws.String(marker),
|
||||
MaxKeys: aws.Int64(limit),
|
||||
Bucket: aws.String(bucketName),
|
||||
Prefix: aws.String(prefix),
|
||||
Marker: aws.String(marker),
|
||||
MaxKeys: aws.Int64(limit),
|
||||
Delimiter: aws.String(delimiter),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
@ -170,8 +175,11 @@ func (s *s3) ListObjectMetadatas(ctx context.Context, bucketName, prefix, marker
|
|||
var metadatas []*ObjectMetadata
|
||||
for _, object := range resp.Contents {
|
||||
metadatas = append(metadatas, &ObjectMetadata{
|
||||
Key: aws.StringValue(object.Key),
|
||||
ETag: aws.StringValue(object.ETag),
|
||||
Key: aws.StringValue(object.Key),
|
||||
ContentLength: aws.Int64Value(object.Size),
|
||||
ETag: aws.StringValue(object.ETag),
|
||||
LastModifiedTime: aws.TimeValue(object.LastModified),
|
||||
StorageClass: aws.StringValue(s.getDefaultStorageClassIfEmpty(object.StorageClass)),
|
||||
})
|
||||
}
|
||||
|
||||
|
|
@ -208,6 +216,18 @@ func (s *s3) IsBucketExist(ctx context.Context, bucketName string) (bool, error)
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// CopyObject copy object from source to destination.
|
||||
func (s *s3) CopyObject(ctx context.Context, bucketName, srcObjectKey, destObjectKey string) error {
|
||||
srcObjectKey = bucketName + "/" + srcObjectKey
|
||||
params := &awss3.CopyObjectInput{
|
||||
Bucket: &bucketName,
|
||||
Key: &destObjectKey,
|
||||
CopySource: &srcObjectKey,
|
||||
}
|
||||
_, err := s.client.CopyObject(params)
|
||||
return err
|
||||
}
|
||||
|
||||
// GetSignURL returns sign url of object.
|
||||
func (s *s3) GetSignURL(ctx context.Context, bucketName, objectKey string, method Method, expire time.Duration) (string, error) {
|
||||
var req *request.Request
|
||||
|
|
@ -242,3 +262,12 @@ func (s *s3) GetSignURL(ctx context.Context, bucketName, objectKey string, metho
|
|||
|
||||
return req.Presign(expire)
|
||||
}
|
||||
|
||||
// getDefaultStorageClassIfEmpty returns the default storage class if the input is empty.
|
||||
func (s *s3) getDefaultStorageClassIfEmpty(storageClass *string) *string {
|
||||
if storageClass == nil || *storageClass == "" {
|
||||
defaultStorageClass := StandardStorageClass
|
||||
return &defaultStorageClass
|
||||
}
|
||||
return storageClass
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue