Fix issues in the objectstorage based on JuiceFS use cases. (#2648)

Signed-off-by: XDTD <1355582364@qq.com>
This commit is contained in:
tan ding 2023-08-30 11:20:00 +08:00 committed by GitHub
parent 0f82cbfb33
commit c29edd28fd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 109 additions and 31 deletions

View File

@ -188,7 +188,7 @@ func (o *objectStorage) initRouter(cfg *config.DaemonOption, logDir string) *gin
// Buckets // Buckets
b := r.Group(RouterGroupBuckets) b := r.Group(RouterGroupBuckets)
b.POST("/:id", o.createBucket) b.POST(":id", o.createBucket)
b.GET(":id/metadatas", o.getObjectMetadatas) b.GET(":id/metadatas", o.getObjectMetadatas)
b.HEAD(":id/objects/*object_key", o.headObject) b.HEAD(":id/objects/*object_key", o.headObject)
b.GET(":id/objects/*object_key", o.getObject) b.GET(":id/objects/*object_key", o.getObject)
@ -545,7 +545,7 @@ func (o *objectStorage) copyObject(ctx *gin.Context) {
var ( var (
bucketName = params.ID bucketName = params.ID
destination = params.ObjectKey destination = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator))
source = form.SourceObjectKey source = form.SourceObjectKey
) )

View File

@ -30,6 +30,7 @@ import (
"net/url" "net/url"
"path/filepath" "path/filepath"
"strconv" "strconv"
"strings"
"time" "time"
"github.com/go-http-utils/headers" "github.com/go-http-utils/headers"
@ -63,7 +64,7 @@ type Dfstore interface {
GetObjectMetadatasRequestWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*http.Request, error) GetObjectMetadatasRequestWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*http.Request, error)
// GetObjectMetadatasWithContext returns list of object metadatas. // GetObjectMetadatasWithContext returns list of object metadatas.
GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*pkgobjectstorage.ObjectMetadatas, error)
// PutObjectRequestWithContext returns *http.Request of putting object. // PutObjectRequestWithContext returns *http.Request of putting object.
PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error) PutObjectRequestWithContext(ctx context.Context, input *PutObjectInput) (*http.Request, error)
@ -133,7 +134,6 @@ type GetObjectMetadataInput struct {
func (i *GetObjectMetadataInput) Validate() error { func (i *GetObjectMetadataInput) Validate() error {
if i.BucketName == "" { if i.BucketName == "" {
return errors.New("invalid BucketName") return errors.New("invalid BucketName")
} }
if i.ObjectKey == "" { if i.ObjectKey == "" {
@ -155,6 +155,11 @@ func (dfs *dfstore) GetObjectMetadataRequestWithContext(ctx context.Context, inp
} }
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
if strings.HasSuffix(input.ObjectKey, "/") {
u.Path += "/"
}
req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil) req, err := http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -227,6 +232,10 @@ func (i *GetObjectMetadatasInput) Validate() error {
return errors.New("invalid BucketName") return errors.New("invalid BucketName")
} }
if i.Limit < 0 || i.Limit > pkgobjectstorage.DefaultGetObjectMetadatasLimit {
return errors.New("invalid limit")
}
return nil return nil
} }
@ -241,7 +250,7 @@ func (dfs *dfstore) GetObjectMetadatasRequestWithContext(ctx context.Context, in
return nil, err return nil, err
} }
u.Path = filepath.Join("buckets", input.BucketName, "objects") u.Path = filepath.Join("buckets", input.BucketName, "metadatas")
query := u.Query() query := u.Query()
if input.Prefix != "" { if input.Prefix != "" {
@ -271,7 +280,7 @@ func (dfs *dfstore) GetObjectMetadatasRequestWithContext(ctx context.Context, in
} }
// GetObjectMetadatasWithContext returns *http.Request of getting object metadatas. // GetObjectMetadatasWithContext returns *http.Request of getting object metadatas.
func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) ([]*pkgobjectstorage.ObjectMetadata, error) { func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *GetObjectMetadatasInput) (*pkgobjectstorage.ObjectMetadatas, error) {
req, err := dfs.GetObjectMetadatasRequestWithContext(ctx, input) req, err := dfs.GetObjectMetadatasRequestWithContext(ctx, input)
if err != nil { if err != nil {
return nil, err return nil, err
@ -287,12 +296,12 @@ func (dfs *dfstore) GetObjectMetadatasWithContext(ctx context.Context, input *Ge
return nil, fmt.Errorf("bad response status %s", resp.Status) return nil, fmt.Errorf("bad response status %s", resp.Status)
} }
var metadatas []*pkgobjectstorage.ObjectMetadata var metadatas pkgobjectstorage.ObjectMetadatas
if err := json.NewDecoder(resp.Body).Decode(&metadatas); err != nil { if err := json.NewDecoder(resp.Body).Decode(&metadatas); err != nil {
return nil, err return nil, err
} }
return metadatas, nil return &metadatas, nil
} }
// GetObjectInput is used to construct request of getting object. // GetObjectInput is used to construct request of getting object.
@ -316,7 +325,6 @@ type GetObjectInput struct {
func (i *GetObjectInput) Validate() error { func (i *GetObjectInput) Validate() error {
if i.BucketName == "" { if i.BucketName == "" {
return errors.New("invalid BucketName") return errors.New("invalid BucketName")
} }
if i.ObjectKey == "" { if i.ObjectKey == "" {
@ -339,6 +347,10 @@ func (dfs *dfstore) GetObjectRequestWithContext(ctx context.Context, input *GetO
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
if strings.HasSuffix(input.ObjectKey, "/") {
u.Path += "/"
}
query := u.Query() query := u.Query()
if input.Filter != "" { if input.Filter != "" {
query.Set("filter", input.Filter) query.Set("filter", input.Filter)
@ -405,7 +417,6 @@ type PutObjectInput struct {
func (i *PutObjectInput) Validate() error { func (i *PutObjectInput) Validate() error {
if i.BucketName == "" { if i.BucketName == "" {
return errors.New("invalid BucketName") return errors.New("invalid BucketName")
} }
if i.ObjectKey == "" { if i.ObjectKey == "" {
@ -469,6 +480,10 @@ func (dfs *dfstore) PutObjectRequestWithContext(ctx context.Context, input *PutO
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
if strings.HasSuffix(input.ObjectKey, "/") {
u.Path += "/"
}
req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body) req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body)
if err != nil { if err != nil {
return nil, err return nil, err
@ -637,7 +652,7 @@ func (dfs *dfstore) CreateBucketRequestWithContext(ctx context.Context, input *C
u.RawQuery = query.Encode() u.RawQuery = query.Encode()
return http.NewRequestWithContext(ctx, http.MethodPut, u.String(), nil) return http.NewRequestWithContext(ctx, http.MethodPost, u.String(), nil)
} }
// DeleteObjectInput is used to construct request of deleting object. // DeleteObjectInput is used to construct request of deleting object.
@ -653,7 +668,6 @@ type DeleteObjectInput struct {
func (i *DeleteObjectInput) Validate() error { func (i *DeleteObjectInput) Validate() error {
if i.BucketName == "" { if i.BucketName == "" {
return errors.New("invalid BucketName") return errors.New("invalid BucketName")
} }
if i.ObjectKey == "" { if i.ObjectKey == "" {
@ -675,6 +689,11 @@ func (dfs *dfstore) DeleteObjectRequestWithContext(ctx context.Context, input *D
} }
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
if strings.HasSuffix(input.ObjectKey, "/") {
u.Path += "/"
}
return http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil) return http.NewRequestWithContext(ctx, http.MethodDelete, u.String(), nil)
} }
@ -711,7 +730,6 @@ type IsObjectExistInput struct {
func (i *IsObjectExistInput) Validate() error { func (i *IsObjectExistInput) Validate() error {
if i.BucketName == "" { if i.BucketName == "" {
return errors.New("invalid BucketName") return errors.New("invalid BucketName")
} }
if i.ObjectKey == "" { if i.ObjectKey == "" {
@ -733,6 +751,11 @@ func (dfs *dfstore) IsObjectExistRequestWithContext(ctx context.Context, input *
} }
u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey) u.Path = filepath.Join("buckets", input.BucketName, "objects", input.ObjectKey)
if strings.HasSuffix(input.ObjectKey, "/") {
u.Path += "/"
}
return http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil) return http.NewRequestWithContext(ctx, http.MethodHead, u.String(), nil)
} }

View File

@ -20,8 +20,10 @@ package objectstorage
import ( import (
"context" "context"
"crypto/tls"
"fmt" "fmt"
"io" "io"
"net/http"
"time" "time"
) )
@ -58,6 +60,15 @@ type ObjectMetadata struct {
StorageClass string StorageClass string
} }
// ObjectMetadatas provides metadatas of object.
type ObjectMetadatas struct {
// CommonPrefixes are similar prefixes in object storage.
CommonPrefixes []string `json:"CommonPrefixes"`
// Metadatas are metadata of objects.
Metadatas []*ObjectMetadata `json:"Metadatas"`
}
// BucketMetadata provides metadata of bucket. // BucketMetadata provides metadata of bucket.
type BucketMetadata struct { type BucketMetadata struct {
// Name is bucket name. // Name is bucket name.
@ -88,7 +99,7 @@ type ObjectStorage interface {
GetObjectMetadata(ctx context.Context, bucketName, objectKey string) (*ObjectMetadata, bool, error) GetObjectMetadata(ctx context.Context, bucketName, objectKey string) (*ObjectMetadata, bool, error)
// GetObjectMetadatas returns the metadata of the objects. // GetObjectMetadatas returns the metadata of the objects.
GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error)
// GetOject returns data of object. // GetOject returns data of object.
GetOject(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error) GetOject(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error)
@ -128,6 +139,9 @@ type objectStorage struct {
// secretKey is access key secret. // secretKey is access key secret.
s3ForcePathStyle bool s3ForcePathStyle bool
// httpClient is http client.
httpClient *http.Client
} }
// Option is a functional option for configuring the objectStorage. // Option is a functional option for configuring the objectStorage.
@ -140,6 +154,13 @@ func WithS3ForcePathStyle(s3ForcePathStyle bool) Option {
} }
} }
// WithHTTPClient set the http client for objectStorage.
func WithHTTPClient(client *http.Client) Option {
return func(o *objectStorage) {
o.httpClient = client
}
}
// New object storage interface. // New object storage interface.
func New(name, region, endpoint, accessKey, secretKey string, options ...Option) (ObjectStorage, error) { func New(name, region, endpoint, accessKey, secretKey string, options ...Option) (ObjectStorage, error) {
o := &objectStorage{ o := &objectStorage{
@ -149,6 +170,20 @@ func New(name, region, endpoint, accessKey, secretKey string, options ...Option)
accessKey: accessKey, accessKey: accessKey,
secretKey: secretKey, secretKey: secretKey,
s3ForcePathStyle: true, s3ForcePathStyle: true,
httpClient: &http.Client{
Transport: &http.Transport{
Proxy: http.ProxyFromEnvironment,
TLSHandshakeTimeout: time.Second * 20,
ResponseHeaderTimeout: time.Second * 30,
IdleConnTimeout: time.Second * 300,
MaxIdleConnsPerHost: 500,
ReadBufferSize: 32 << 10,
WriteBufferSize: 32 << 10,
DisableCompression: true,
TLSClientConfig: &tls.Config{},
},
Timeout: time.Hour,
},
} }
for _, opt := range options { for _, opt := range options {
@ -157,11 +192,11 @@ func New(name, region, endpoint, accessKey, secretKey string, options ...Option)
switch o.name { switch o.name {
case ServiceNameS3: case ServiceNameS3:
return newS3(o.region, o.endpoint, o.accessKey, o.secretKey, o.s3ForcePathStyle) return newS3(o.region, o.endpoint, o.accessKey, o.secretKey, o.s3ForcePathStyle, o.httpClient)
case ServiceNameOSS: case ServiceNameOSS:
return newOSS(o.region, o.endpoint, o.accessKey, o.secretKey) return newOSS(o.region, o.endpoint, o.accessKey, o.secretKey, o.httpClient)
case ServiceNameOBS: case ServiceNameOBS:
return newOBS(o.region, o.endpoint, o.accessKey, o.secretKey) return newOBS(o.region, o.endpoint, o.accessKey, o.secretKey, o.httpClient)
} }
return nil, fmt.Errorf("unknow service name %s", name) return nil, fmt.Errorf("unknow service name %s", name)

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net/http"
"strings" "strings"
"time" "time"
@ -32,8 +33,8 @@ type obs struct {
} }
// New oss instance. // New oss instance.
func newOBS(region, endpoint, accessKey, secretKey string) (ObjectStorage, error) { func newOBS(region, endpoint, accessKey, secretKey string, httpClient *http.Client) (ObjectStorage, error) {
client, err := huaweiobs.New(accessKey, secretKey, endpoint) client, err := huaweiobs.New(accessKey, secretKey, endpoint, huaweiobs.WithHttpClient(httpClient))
if err != nil { if err != nil {
return nil, fmt.Errorf("new obs client failed: %s", err) return nil, fmt.Errorf("new obs client failed: %s", err)
} }
@ -120,7 +121,7 @@ func (o *obs) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
} }
// GetObjectMetadatas returns the metadatas of the objects. // GetObjectMetadatas returns the metadatas of the objects.
func (o *obs) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { func (o *obs) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) {
if limit == 0 { if limit == 0 {
limit = DefaultGetObjectMetadatasLimit limit = DefaultGetObjectMetadatasLimit
} }
@ -148,7 +149,10 @@ func (o *obs) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker
}) })
} }
return metadatas, nil return &ObjectMetadatas{
Metadatas: metadatas,
CommonPrefixes: resp.CommonPrefixes,
}, nil
} }
// GetOject returns data of object. // GetOject returns data of object.

View File

@ -35,8 +35,8 @@ type oss struct {
} }
// New oss instance. // New oss instance.
func newOSS(region, endpoint, accessKey, secretKey string) (ObjectStorage, error) { func newOSS(region, endpoint, accessKey, secretKey string, httpClient *http.Client) (ObjectStorage, error) {
client, err := aliyunoss.New(endpoint, accessKey, secretKey, aliyunoss.Region(region)) client, err := aliyunoss.New(endpoint, accessKey, secretKey, aliyunoss.Region(region), aliyunoss.HTTPClient(httpClient))
if err != nil { if err != nil {
return nil, fmt.Errorf("new oss client failed: %s", err) return nil, fmt.Errorf("new oss client failed: %s", err)
} }
@ -96,7 +96,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
header, err := bucket.GetObjectDetailedMeta(objectKey) header, err := bucket.GetObjectDetailedMeta(objectKey)
if err != nil { if err != nil {
var serr *aliyunoss.ServiceError var serr aliyunoss.ServiceError
if errors.As(err, &serr) && serr.StatusCode == http.StatusNotFound { if errors.As(err, &serr) && serr.StatusCode == http.StatusNotFound {
return nil, false, nil return nil, false, nil
} }
@ -129,7 +129,7 @@ func (o *oss) GetObjectMetadata(ctx context.Context, bucketName, objectKey strin
} }
// GetObjectMetadatas returns the metadatas of the objects. // GetObjectMetadatas returns the metadatas of the objects.
func (o *oss) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { func (o *oss) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) {
bucket, err := o.client.Bucket(bucketName) bucket, err := o.client.Bucket(bucketName)
if err != nil { if err != nil {
return nil, err return nil, err
@ -155,7 +155,10 @@ func (o *oss) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker
}) })
} }
return metadatas, nil return &ObjectMetadatas{
Metadatas: metadatas,
CommonPrefixes: resp.CommonPrefixes,
}, nil
} }
// GetOject returns data of object. // GetOject returns data of object.

View File

@ -20,6 +20,8 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"net/http"
"net/url"
"path" "path"
"time" "time"
@ -37,8 +39,8 @@ type s3 struct {
} }
// New s3 instance. // New s3 instance.
func newS3(region, endpoint, accessKey, secretKey string, s3ForcePathStyle bool) (ObjectStorage, error) { func newS3(region, endpoint, accessKey, secretKey string, s3ForcePathStyle bool, httpClient *http.Client) (ObjectStorage, error) {
cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, "")) cfg := aws.NewConfig().WithCredentials(credentials.NewStaticCredentials(accessKey, secretKey, "")).WithHTTPClient(httpClient)
s, err := session.NewSession(cfg) s, err := session.NewSession(cfg)
if err != nil { if err != nil {
return nil, fmt.Errorf("new aws session failed: %s", err) return nil, fmt.Errorf("new aws session failed: %s", err)
@ -121,7 +123,7 @@ func (s *s3) GetObjectMetadata(ctx context.Context, bucketName, objectKey string
} }
// GetObjectMetadatas returns the metadatas of the objects. // GetObjectMetadatas returns the metadatas of the objects.
func (s *s3) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) ([]*ObjectMetadata, error) { func (s *s3) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker, delimiter string, limit int64) (*ObjectMetadatas, error) {
if limit == 0 { if limit == 0 {
limit = DefaultGetObjectMetadatasLimit limit = DefaultGetObjectMetadatasLimit
} }
@ -148,7 +150,19 @@ func (s *s3) GetObjectMetadatas(ctx context.Context, bucketName, prefix, marker,
}) })
} }
return metadatas, nil commonPrefixes := make([]string, len(resp.CommonPrefixes))
for _, commonPrefix := range resp.CommonPrefixes {
prefix, err := url.QueryUnescape(*commonPrefix.Prefix)
if err != nil {
return nil, fmt.Errorf("failed to decode commonPrefixes %s, error: %s", *commonPrefix.Prefix, err.Error())
}
commonPrefixes = append(commonPrefixes, prefix)
}
return &ObjectMetadatas{
Metadatas: metadatas,
CommonPrefixes: commonPrefixes,
}, nil
} }
// GetOject returns data of object. // GetOject returns data of object.
@ -193,7 +207,6 @@ func (s *s3) DeleteObject(ctx context.Context, bucketName, objectKey string) err
func (s *s3) IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) { func (s *s3) IsObjectExist(ctx context.Context, bucketName, objectKey string) (bool, error) {
_, isExist, err := s.GetObjectMetadata(ctx, bucketName, objectKey) _, isExist, err := s.GetObjectMetadata(ctx, bucketName, objectKey)
if err != nil { if err != nil {
return false, err return false, err
} }