diff --git a/client/daemon/objectstorage/objectstorage.go b/client/daemon/objectstorage/objectstorage.go index 2678c4436..31125be12 100644 --- a/client/daemon/objectstorage/objectstorage.go +++ b/client/daemon/objectstorage/objectstorage.go @@ -188,7 +188,7 @@ func (o *objectStorage) initRouter(cfg *config.DaemonOption, logDir string) *gin // Buckets b := r.Group(RouterGroupBuckets) - b.POST("/:id", o.createBucket) + b.POST(":id", o.createBucket) b.GET(":id/metadatas", o.getObjectMetadatas) b.HEAD(":id/objects/*object_key", o.headObject) b.GET(":id/objects/*object_key", o.getObject) @@ -545,7 +545,7 @@ func (o *objectStorage) copyObject(ctx *gin.Context) { var ( bucketName = params.ID - destination = params.ObjectKey + destination = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) source = form.SourceObjectKey ) diff --git a/client/dfstore/dfstore.go b/client/dfstore/dfstore.go index 1df87d276..cb8ab98c3 100644 --- a/client/dfstore/dfstore.go +++ b/client/dfstore/dfstore.go @@ -30,6 +30,7 @@ import ( "net/url" "path/filepath" "strconv" + "strings" "time" "github.com/go-http-utils/headers" @@ -63,7 +64,7 @@ type Dfstore interface { GetObjectMetadatasRequestWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*http.Request, error) // GetObjectMetadatasWithContext returns list of object metadatas. - GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) + GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*pkgobjectstorage.ObjectMetadatas, error) // PutObjectRequestWithContext returns *http.Request of putting object. PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error) @@ -133,7 +134,6 @@ type GetObjectMetadataInput struct { func (i *GetObjectMetadataInput) Validate() error { if i.BucketName == "" { return errors.New("invalid BucketName") - } if i.ObjectKey == "" { @@ -155,6 +155,11 @@ func (dfs *dfstore) GetObjectMetadataRequestWithContext(ctx context.Context, inp } u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) + + if strings.HasSuffix(input.ObjectKey, "/") { + u.Path += "/" + } + req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil) if err != nil { return nil, err @@ -227,6 +232,10 @@ func (i *GetObjectMetadatasInput) Validate() error { return errors.New("invalid BucketName") } + if i.Limit < 0 || i.Limit > pkgobjectstorage.DefaultGetObjectMetadatasLimit { + return errors.New("invalid limit") + } + return nil } @@ -241,7 +250,7 @@ func (dfs *dfstore) GetObjectMetadatasRequestWithContext(ctx context.Context, in return nil, err } - u.Path = filepath.Join("buckets", input.BucketName, "objects") + u.Path = filepath.Join("buckets", input.BucketName, "metadatas") query := u.Query() if input.Prefix != "" { @@ -271,7 +280,7 @@ func (dfs *dfstore) GetObjectMetadatasRequestWithContext(ctx context.Context, in } // GetObjectMetadatasWithContext returns *http.Request of getting object metadatas. -func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) { +func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*pkgobjectstorage.ObjectMetadatas, error) { req, err := dfs.GetObjectMetadatasRequestWithContext(ctx, input) if err != nil { return nil, err @@ -287,12 +296,12 @@ func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *Ge return nil, fmt.Errorf("bad response status %s", resp.Status) } - var metadatas []*pkgobjectstorage.ObjectMetadata + var metadatas pkgobjectstorage.ObjectMetadatas if err := json.NewDecoder(resp.Body).Decode(&metadatas); err != nil { return nil, err } - return metadatas, nil + return &metadatas, nil } // GetObjectInput is used to construct request of getting object. @@ -316,7 +325,6 @@ type GetObjectInput struct { func (i *GetObjectInput) Validate() error { if i.BucketName == "" { return errors.New("invalid BucketName") - } if i.ObjectKey == "" { @@ -339,6 +347,10 @@ func (dfs *dfstore) GetObjectRequestWithContext(ctx context.Context, input *GetO u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) + if strings.HasSuffix(input.ObjectKey, "/") { + u.Path += "/" + } + query := u.Query() if input.Filter != "" { query.Set("filter", input.Filter) @@ -405,7 +417,6 @@ type PutObjectInput struct { func (i *PutObjectInput) Validate() error { if i.BucketName == "" { return errors.New("invalid BucketName") - } if i.ObjectKey == "" { @@ -469,6 +480,10 @@ func (dfs *dfstore) PutObjectRequestWithContext(ctx context.Context, input *PutO u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) + if strings.HasSuffix(input.ObjectKey, "/") { + u.Path += "/" + } + req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body) if err != nil { return nil, err @@ -637,7 +652,7 @@ func (dfs *dfstore) CreateBucketRequestWithContext(ctx context.Context, input *C u.RawQuery = query.Encode() - return http.NewRequestWithContext(ctx, http.MethodPut, u.String(), nil) + return http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil) } // DeleteObjectInput is used to construct request of deleting object. @@ -653,7 +668,6 @@ type DeleteObjectInput struct { func (i *DeleteObjectInput) Validate() error { if i.BucketName == "" { return errors.New("invalid BucketName") - } if i.ObjectKey == "" { @@ -675,6 +689,11 @@ func (dfs *dfstore) DeleteObjectRequestWithContext(ctx context.Context, input *D } u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) + + if strings.HasSuffix(input.ObjectKey, "/") { + u.Path += "/" + } + return http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil) } @@ -711,7 +730,6 @@ type IsObjectExistInput struct { func (i *IsObjectExistInput) Validate() error { if i.BucketName == "" { return errors.New("invalid BucketName") - } if i.ObjectKey == "" { @@ -733,6 +751,11 @@ func (dfs *dfstore) IsObjectExistRequestWithContext(ctx context.Context, input * } u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) + + if strings.HasSuffix(input.ObjectKey, "/") { + u.Path += "/" + } + return http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil) } diff --git a/pkg/objectstorage/objectstorage.go b/pkg/objectstorage/objectstorage.go index 38068c3ce..189c474f0 100644 --- a/pkg/objectstorage/objectstorage.go +++ b/pkg/objectstorage/objectstorage.go @@ -20,8 +20,10 @@ package objectstorage import ( "context" + "crypto/tls" "fmt" "io" + "net/http" "time" ) @@ -58,6 +60,15 @@ type ObjectMetadata struct { StorageClass string } +// ObjectMetadatas provides metadatas of object. +type ObjectMetadatas struct { + // CommonPrefixes are similar prefixes in object storage. + CommonPrefixes []string `json:"CommonPrefixes"` + + // Metadatas are metadata of objects. + Metadatas []*ObjectMetadata `json:"Metadatas"` +} + // BucketMetadata provides metadata of bucket. type BucketMetadata struct { // Name is bucket name. @@ -88,7 +99,7 @@ type ObjectStorage interface { GetObjectMetadata(ctx context.Context, bucketName, objectKey string) (*ObjectMetadata, bool, error) // GetObjectMetadatas returns the metadata of the objects. - GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) + GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) // GetOject returns data of object. GetOject(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error) @@ -128,6 +139,9 @@ type objectStorage struct { // secretKey is access key secret. s3ForcePathStyle bool + + // httpClient is http client. + httpClient *http.Client } // Option is a functional option for configuring the objectStorage. @@ -140,6 +154,13 @@ func WithS3ForcePathStyle(s3ForcePathStyle bool) Option { } } +// WithHTTPClient set the http client for objectStorage. +func WithHTTPClient(client *http.Client) Option { + return func(o *objectStorage) { + o.httpClient = client + } +} + // New object storage interface. func New(name, region, endpoint, accessKey, secretKey string, options ...Option) (ObjectStorage, error) { o := &objectStorage{ @@ -149,6 +170,20 @@ func New(name, region, endpoint, accessKey, secretKey string, options ...Option) accessKey: accessKey, secretKey: secretKey, s3ForcePathStyle: true, + httpClient: &http.Client{ + Transport: &http.Transport{ + Proxy: http.ProxyFromEnvironment, + TLSHandshakeTimeout: time.Second * 20, + ResponseHeaderTimeout: time.Second * 30, + IdleConnTimeout: time.Second * 300, + MaxIdleConnsPerHost: 500, + ReadBufferSize: 32 << 10, + WriteBufferSize: 32 << 10, + DisableCompression: true, + TLSClientConfig: &tls.Config{}, + }, + Timeout: time.Hour, + }, } for _, opt := range options { @@ -157,11 +192,11 @@ func New(name, region, endpoint, accessKey, secretKey string, options ...Option) switch o.name { case ServiceNameS3: - return newS3(o.region, o.endpoint, o.accessKey, o.secretKey, o.s3ForcePathStyle) + return newS3(o.region, o.endpoint, o.accessKey, o.secretKey, o.s3ForcePathStyle, o.httpClient) case ServiceNameOSS: - return newOSS(o.region, o.endpoint, o.accessKey, o.secretKey) + return newOSS(o.region, o.endpoint, o.accessKey, o.secretKey, o.httpClient) case ServiceNameOBS: - return newOBS(o.region, o.endpoint, o.accessKey, o.secretKey) + return newOBS(o.region, o.endpoint, o.accessKey, o.secretKey, o.httpClient) } return nil, fmt.Errorf("unknow service name %s", name) diff --git a/pkg/objectstorage/obs.go b/pkg/objectstorage/obs.go index 70796f337..3e1da3fa3 100644 --- a/pkg/objectstorage/obs.go +++ b/pkg/objectstorage/obs.go @@ -20,6 +20,7 @@ import ( "context" "fmt" "io" + "net/http" "strings" "time" @@ -32,8 +33,8 @@ type obs struct { } // New oss instance. -func newOBS(region, endpoint, accessKey, secretKey string) (ObjectStorage, error) { - client, err := huaweiobs.New(accessKey, secretKey, endpoint) +func newOBS(region, endpoint, accessKey, secretKey string, httpClient *http.Client) (ObjectStorage, error) { + client, err := huaweiobs.New(accessKey, secretKey, endpoint, huaweiobs.WithHttpClient(httpClient)) if err != nil { return nil, fmt.Errorf("new obs client failed: %s", err) } @@ -120,7 +121,7 @@ func (o *obs) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin } // GetObjectMetadatas returns the metadatas of the objects. -func (o *obs) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { +func (o *obs) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) { if limit == 0 { limit = DefaultGetObjectMetadatasLimit } @@ -148,7 +149,10 @@ func (o *obs) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker }) } - return metadatas, nil + return &ObjectMetadatas{ + Metadatas: metadatas, + CommonPrefixes: resp.CommonPrefixes, + }, nil } // GetOject returns data of object. diff --git a/pkg/objectstorage/oss.go b/pkg/objectstorage/oss.go index 922639d24..2caa48ab8 100644 --- a/pkg/objectstorage/oss.go +++ b/pkg/objectstorage/oss.go @@ -35,8 +35,8 @@ type oss struct { } // New oss instance. -func newOSS(region, endpoint, accessKey, secretKey string) (ObjectStorage, error) { - client, err := aliyunoss.New(endpoint, accessKey, secretKey, aliyunoss.Region(region)) +func newOSS(region, endpoint, accessKey, secretKey string, httpClient *http.Client) (ObjectStorage, error) { + client, err := aliyunoss.New(endpoint, accessKey, secretKey, aliyunoss.Region(region), aliyunoss.HTTPClient(httpClient)) if err != nil { return nil, fmt.Errorf("new oss client failed: %s", err) } @@ -96,7 +96,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin header, err := bucket.GetObjectDetailedMeta(objectKey) if err != nil { - var serr *aliyunoss.ServiceError + var serr aliyunoss.ServiceError if errors.As(err, &serr) && serr.StatusCode == http.StatusNotFound { return nil, false, nil } @@ -129,7 +129,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin } // GetObjectMetadatas returns the metadatas of the objects. -func (o *oss) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { +func (o *oss) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) { bucket, err := o.client.Bucket(bucketName) if err != nil { return nil, err @@ -155,7 +155,10 @@ func (o *oss) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker }) } - return metadatas, nil + return &ObjectMetadatas{ + Metadatas: metadatas, + CommonPrefixes: resp.CommonPrefixes, + }, nil } // GetOject returns data of object. diff --git a/pkg/objectstorage/s3.go b/pkg/objectstorage/s3.go index 4ccbe5be0..41df250b1 100644 --- a/pkg/objectstorage/s3.go +++ b/pkg/objectstorage/s3.go @@ -20,6 +20,8 @@ import ( "context" "fmt" "io" + "net/http" + "net/url" "path" "time" @@ -37,8 +39,8 @@ type s3 struct { } // New s3 instance. -func newS3(region, endpoint, accessKey, secretKey string, s3ForcePathStyle bool) (ObjectStorage, error) { - cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, "")) +func newS3(region, endpoint, accessKey, secretKey string, s3ForcePathStyle bool, httpClient *http.Client) (ObjectStorage, error) { + cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, "")).WithHTTPClient(httpClient) s, err := session.NewSession(cfg) if err != nil { return nil, fmt.Errorf("new aws session failed: %s", err) @@ -121,7 +123,7 @@ func (s *s3) GetObjectMetadata(ctx context.Context, bucketName, objectKey string } // GetObjectMetadatas returns the metadatas of the objects. -func (s *s3) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { +func (s *s3) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) { if limit == 0 { limit = DefaultGetObjectMetadatasLimit } @@ -148,7 +150,19 @@ func (s *s3) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, }) } - return metadatas, nil + commonPrefixes := make([]string, len(resp.CommonPrefixes)) + for _, commonPrefix := range resp.CommonPrefixes { + prefix, err := url.QueryUnescape(*commonPrefix.Prefix) + if err != nil { + return nil, fmt.Errorf("failed to decode commonPrefixes %s, error: %s", *commonPrefix.Prefix, err.Error()) + } + commonPrefixes = append(commonPrefixes, prefix) + } + + return &ObjectMetadatas{ + Metadatas: metadatas, + CommonPrefixes: commonPrefixes, + }, nil } // GetOject returns data of object. @@ -193,7 +207,6 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectKey string) err func (s *s3) IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) { _, isExist, err := s.GetObjectMetadata(ctx, bucketName, objectKey) if err != nil { - return false, err }