/* * Copyright 2022 The Dragonfly Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ //go:generate mockgen -destination mocks/objectstorage_mock.go -source objectstorage.go -package mocks package objectstorage import ( "bytes" "context" "errors" "fmt" "io" "math" "mime/multipart" "net" "net/http" "net/url" "os" "path/filepath" "strconv" "strings" "time" "github.com/gin-gonic/gin" "github.com/go-http-utils/headers" ginprometheus "github.com/mcuadros/go-gin-prometheus" "go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" "d7y.io/dragonfly/v2/client/config" "d7y.io/dragonfly/v2/client/daemon/peer" "d7y.io/dragonfly/v2/client/daemon/storage" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" nethttp "d7y.io/dragonfly/v2/pkg/net/http" "d7y.io/dragonfly/v2/pkg/objectstorage" pkgstrings "d7y.io/dragonfly/v2/pkg/strings" ) const ( // AsyncWriteBack writes the object asynchronously to the backend. AsyncWriteBack = iota // WriteBack writes the object synchronously to the backend. WriteBack // Ephemeral only writes the object to the dfdaemon. // It is only provided for creating temporary objects between peers, // and users are not allowed to use this mode. Ephemeral ) const ( PrometheusSubsystemName = "dragonfly_dfdaemon_object_stroage" OtelServiceName = "dragonfly-dfdaemon-object-storage" ) const ( RouterGroupBuckets = "/buckets" ) var GinLogFileName = "gin-object-stroage.log" const ( // defaultSignExpireTime is default expire of sign url. defaultSignExpireTime = 5 * time.Minute ) // ObjectStorage is the interface used for object storage server. type ObjectStorage interface { // Started object storage server. Serve(lis net.Listener) error // Stop object storage server. Stop() error } // objectStorage provides object storage function. type objectStorage struct { *http.Server config *config.DaemonOption dynconfig config.Dynconfig objectStorageClient objectstorage.ObjectStorage peerTaskManager peer.TaskManager storageManager storage.Manager peerIDGenerator peer.IDGenerator } // New returns a new ObjectStorage instence. func New(cfg *config.DaemonOption, dynconfig config.Dynconfig, peerTaskManager peer.TaskManager, storageManager storage.Manager, logDir string) (ObjectStorage, error) { // Initialize object storage client. config, err := dynconfig.GetObjectStorage() if err != nil { return nil, err } objectStorageClient, err := objectstorage.New(config.Name, config.Region, config.Endpoint, config.AccessKey, config.SecretKey, objectstorage.WithS3ForcePathStyle(config.S3ForcePathStyle)) if err != nil { return nil, err } // Initialize object storage server. o := &objectStorage{ config: cfg, dynconfig: dynconfig, objectStorageClient: objectStorageClient, peerTaskManager: peerTaskManager, storageManager: storageManager, peerIDGenerator: peer.NewPeerIDGenerator(cfg.Host.AdvertiseIP.String()), } router := o.initRouter(cfg, logDir) o.Server = &http.Server{ Handler: router, } return o, nil } // Started object storage server. func (o *objectStorage) Serve(lis net.Listener) error { return o.Server.Serve(lis) } // Stop object storage server. func (o *objectStorage) Stop() error { return o.Server.Shutdown(context.Background()) } // Initialize router of gin. func (o *objectStorage) initRouter(cfg *config.DaemonOption, logDir string) *gin.Engine { // Set mode if !cfg.Verbose { gin.SetMode(gin.ReleaseMode) } // Logging to a file if !cfg.Console { gin.DisableConsoleColor() logDir := filepath.Join(logDir, "daemon") f, _ := os.Create(filepath.Join(logDir, GinLogFileName)) gin.DefaultWriter = io.MultiWriter(f) } r := gin.New() // Middleware r.Use(gin.Logger()) r.Use(gin.Recovery()) // Prometheus metrics p := ginprometheus.NewPrometheus(PrometheusSubsystemName) // Prometheus metrics need to reduce label, // refer to https://prometheus.io/docs/practices/instrumentation/#do-not-overuse-labels. p.ReqCntURLLabelMappingFn = func(c *gin.Context) string { if strings.HasPrefix(c.Request.URL.Path, RouterGroupBuckets) { return RouterGroupBuckets } return c.Request.URL.Path } p.Use(r) // Opentelemetry if cfg.Options.Telemetry.Jaeger != "" { r.Use(otelgin.Middleware(OtelServiceName)) } // Health Check. r.GET("/healthy", o.getHealth) // Buckets b := r.Group(RouterGroupBuckets) b.POST("/:id", o.createBucket) b.GET(":id/metadatas", o.getObjectMetadatas) b.HEAD(":id/objects/*object_key", o.headObject) b.GET(":id/objects/*object_key", o.getObject) b.DELETE(":id/objects/*object_key", o.destroyObject) b.PUT(":id/objects/*object_key", o.putObject) return r } // getHealth uses to check server health. func (o *objectStorage) getHealth(ctx *gin.Context) { ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK)) } // headObject uses to head object. func (o *objectStorage) headObject(ctx *gin.Context) { var params ObjectParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var ( bucketName = params.ID objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) ) meta, isExist, err := o.objectStorageClient.GetObjectMetadata(ctx, bucketName, objectKey) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } if !isExist { ctx.JSON(http.StatusNotFound, gin.H{"errors": http.StatusText(http.StatusNotFound)}) return } ctx.Header(headers.ContentDisposition, meta.ContentDisposition) ctx.Header(headers.ContentEncoding, meta.ContentEncoding) ctx.Header(headers.ContentLanguage, meta.ContentLanguage) ctx.Header(headers.ContentLength, fmt.Sprint(meta.ContentLength)) ctx.Header(headers.ContentType, meta.ContentType) ctx.Header(headers.ETag, meta.ETag) ctx.Header(config.HeaderDragonflyObjectMetaDigest, meta.Digest) ctx.Header(config.HeaderDragonflyObjectMetaLastModifiedTime, meta.LastModifiedTime.Format(http.TimeFormat)) ctx.Header(config.HeaderDragonflyObjectMetaStorageClass, meta.StorageClass) ctx.Status(http.StatusOK) return } // getObject uses to download object data. func (o *objectStorage) getObject(ctx *gin.Context) { var params ObjectParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var query GetObjectQuery if err := ctx.ShouldBindQuery(&query); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var ( bucketName = params.ID objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) filter = query.Filter rg *nethttp.Range err error ) // Initialize filter field. urlMeta := &commonv1.UrlMeta{Filter: o.config.ObjectStorage.Filter} if filter != "" { urlMeta.Filter = filter } meta, isExist, err := o.objectStorageClient.GetObjectMetadata(ctx, bucketName, objectKey) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } if !isExist { ctx.JSON(http.StatusNotFound, gin.H{"errors": http.StatusText(http.StatusNotFound)}) return } urlMeta.Digest = meta.Digest // Parse http range header. rangeHeader := ctx.GetHeader(headers.Range) if len(rangeHeader) > 0 { rangeValue, err := nethttp.ParseOneRange(rangeHeader, math.MaxInt64) if err != nil { ctx.JSON(http.StatusRequestedRangeNotSatisfiable, gin.H{"errors": err.Error()}) return } rg = &rangeValue // Range header in dragonfly is without "bytes=". urlMeta.Range = strings.TrimPrefix(rangeHeader, "bytes=") // When the request has a range header, // there is no need to calculate md5, set this value to empty. urlMeta.Digest = "" } signURL, err := o.objectStorageClient.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } taskID := idgen.TaskIDV1(signURL, urlMeta) log := logger.WithTaskID(taskID) log.Infof("get object %s meta: %s %#v", objectKey, signURL, urlMeta) reader, attr, err := o.peerTaskManager.StartStreamTask(ctx, &peer.StreamTaskRequest{ URL: signURL, URLMeta: urlMeta, Range: rg, PeerID: o.peerIDGenerator.PeerID(), }) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } defer reader.Close() var contentLength int64 = -1 if l, ok := attr[headers.ContentLength]; ok { if i, err := strconv.ParseInt(l, 10, 64); err == nil { contentLength = i } } log.Infof("object content length is %d and content type is %s", contentLength, attr[headers.ContentType]) ctx.DataFromReader(http.StatusOK, contentLength, attr[headers.ContentType], reader, nil) } // destroyObject uses to delete object data. func (o *objectStorage) destroyObject(ctx *gin.Context) { var params ObjectParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var ( bucketName = params.ID objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) ) logger.Infof("destroy object %s in bucket %s", objectKey, bucketName) if err := o.objectStorageClient.DeleteObject(ctx, bucketName, objectKey); err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } ctx.Status(http.StatusOK) return } // putObject uses to upload object data. func (o *objectStorage) putObject(ctx *gin.Context) { operation := ctx.Request.Header.Get(config.HeaderDragonflyObjectOperation) if operation == CopyOperation { o.copyObject(ctx) return } var params ObjectParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var form PutObjectRequest if err := ctx.ShouldBind(&form); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var ( bucketName = params.ID objectKey = strings.TrimPrefix(params.ObjectKey, string(os.PathSeparator)) mode = form.Mode filter = form.Filter maxReplicas = form.MaxReplicas fileHeader = form.File ) signURL, err := o.objectStorageClient.GetSignURL(ctx, bucketName, objectKey, objectstorage.MethodGet, defaultSignExpireTime) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } // Initialize url meta. urlMeta := &commonv1.UrlMeta{Filter: o.config.ObjectStorage.Filter} dgst := o.md5FromFileHeader(fileHeader) urlMeta.Digest = dgst.String() if filter != "" { urlMeta.Filter = filter } // Initialize max replicas. if maxReplicas == 0 { maxReplicas = o.config.ObjectStorage.MaxReplicas } // Initialize task id and peer id. taskID := idgen.TaskIDV1(signURL, urlMeta) peerID := o.peerIDGenerator.PeerID() log := logger.WithTaskAndPeerID(taskID, peerID) log.Infof("upload object %s meta: %s %#v", objectKey, signURL, urlMeta) // Import object to local storage. log.Infof("import object %s to local storage", objectKey) if err := o.importObjectToLocalStorage(ctx, taskID, peerID, fileHeader); err != nil { log.Error(err) ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } // Announce peer information to scheduler. log.Info("announce peer to scheduler") if err := o.peerTaskManager.AnnouncePeerTask(ctx, storage.PeerTaskMetadata{ TaskID: taskID, PeerID: peerID, }, signURL, commonv1.TaskType_DfStore, urlMeta); err != nil { log.Error(err) ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } // Handle task for backend. switch mode { case Ephemeral: ctx.Status(http.StatusOK) return case WriteBack: // Import object to seed peer. go func() { if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, urlMeta.Filter, Ephemeral, fileHeader, maxReplicas, log); err != nil { log.Errorf("import object %s to seed peers failed: %s", objectKey, err) } }() // Import object to object storage. log.Infof("import object %s to bucket %s", objectKey, bucketName) if err := o.importObjectToBackend(ctx, bucketName, objectKey, dgst, fileHeader); err != nil { log.Error(err) ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } ctx.Status(http.StatusOK) return case AsyncWriteBack: // Import object to seed peer. go func() { if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, urlMeta.Filter, Ephemeral, fileHeader, maxReplicas, log); err != nil { log.Errorf("import object %s to seed peers failed: %s", objectKey, err) } }() // Import object to object storage. go func() { log.Infof("import object %s to bucket %s", objectKey, bucketName) if err := o.importObjectToBackend(context.Background(), bucketName, objectKey, dgst, fileHeader); err != nil { log.Errorf("import object %s to bucket %s failed: %s", objectKey, bucketName, err.Error()) return } }() ctx.Status(http.StatusOK) return } ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": fmt.Sprintf("unknow mode %d", mode)}) return } // createBucket uses to create bucket. func (o *objectStorage) createBucket(ctx *gin.Context) { var params BucketParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } bucketName := params.ID logger.Infof("create bucket %s ", bucketName) if err := o.objectStorageClient.CreateBucket(ctx, bucketName); err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } ctx.Status(http.StatusOK) } // getObjectMetadatas uses to list the metadata of the objects. func (o *objectStorage) getObjectMetadatas(ctx *gin.Context) { var params BucketParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var query GetObjectMetadatasQuery if err := ctx.ShouldBindQuery(&query); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var ( bucketName = params.ID prefix = query.Prefix marker = query.Marker delimiter = query.Delimiter limit = query.Limit ) logger.Infof("get object metadatas in bucket %s", bucketName) metadatas, err := o.objectStorageClient.GetObjectMetadatas(ctx, bucketName, prefix, marker, delimiter, limit) if err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } ctx.JSON(http.StatusOK, metadatas) } // copyObject uses to copy object. func (o *objectStorage) copyObject(ctx *gin.Context) { var params ObjectParams if err := ctx.ShouldBindUri(¶ms); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var form CopyObjectRequest if err := ctx.ShouldBind(&form); err != nil { ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) return } var ( bucketName = params.ID destination = params.ObjectKey source = form.SourceObjectKey ) logger.Infof("copy object from %s to %s", source, destination) if err := o.objectStorageClient.CopyObject(ctx, bucketName, source, destination); err != nil { ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } ctx.Status(http.StatusOK) } // getAvailableSeedPeer uses to calculate md5 with file header. func (o *objectStorage) md5FromFileHeader(fileHeader *multipart.FileHeader) (dig *digest.Digest) { f, err := fileHeader.Open() if err != nil { return nil } defer func() { if err := f.Close(); err != nil { dig = nil } }() return digest.New(digest.AlgorithmMD5, digest.MD5FromReader(f)) } // importObjectToBackend uses to import object to backend. func (o *objectStorage) importObjectToBackend(ctx context.Context, bucketName, objectKey string, dgst *digest.Digest, fileHeader *multipart.FileHeader) (err error) { f, err := fileHeader.Open() if err != nil { return err } defer func() { if cerr := f.Close(); cerr != nil { err = errors.Join(err, cerr) } }() return o.objectStorageClient.PutObject(ctx, bucketName, objectKey, dgst.String(), f) } // importObjectToSeedPeers uses to import object to local storage. func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID, peerID string, fileHeader *multipart.FileHeader) (err error) { f, err := fileHeader.Open() if err != nil { return nil } defer func() { errClose := f.Close() if errClose != nil { err = errors.Join(err, errClose) } }() meta := storage.PeerTaskMetadata{ TaskID: taskID, PeerID: peerID, } // Register task. tsd, err := o.storageManager.RegisterTask(ctx, &storage.RegisterTaskRequest{ PeerTaskMetadata: meta, }) if err != nil { return err } // Import task data to dfdaemon. return o.peerTaskManager.GetPieceManager().Import(ctx, meta, tsd, fileHeader.Size, f) } // importObjectToSeedPeers uses to import object to available seed peers. func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey, filter string, mode int, fileHeader *multipart.FileHeader, maxReplicas int, log *logger.SugaredLoggerOnWith) error { schedulers, err := o.dynconfig.GetSchedulers() if err != nil { return err } var seedPeerHosts []string for _, scheduler := range schedulers { for _, seedPeer := range scheduler.SeedPeers { if o.config.Host.AdvertiseIP.String() != seedPeer.Ip && seedPeer.ObjectStoragePort > 0 { seedPeerHosts = append(seedPeerHosts, fmt.Sprintf("%s:%d", seedPeer.Ip, seedPeer.ObjectStoragePort)) } } } seedPeerHosts = pkgstrings.Unique(seedPeerHosts) var replicas int for _, seedPeerHost := range seedPeerHosts { log.Infof("import object %s to seed peer %s", objectKey, seedPeerHost) if err := o.importObjectToSeedPeer(ctx, seedPeerHost, bucketName, objectKey, filter, mode, fileHeader); err != nil { log.Errorf("import object %s to seed peer %s failed: %s", objectKey, seedPeerHost, err) continue } replicas++ if replicas >= maxReplicas { break } } log.Infof("import %d object %s to seed peers", replicas, objectKey) return nil } // importObjectToSeedPeer uses to import object to seed peer. func (o *objectStorage) importObjectToSeedPeer(ctx context.Context, seedPeerHost, bucketName, objectKey, filter string, mode int, fileHeader *multipart.FileHeader) (err error) { f, err := fileHeader.Open() if err != nil { return err } defer func() { if cerr := f.Close(); cerr != nil { err = errors.Join(err, cerr) } }() body := &bytes.Buffer{} writer := multipart.NewWriter(body) if err = writer.WriteField("mode", fmt.Sprint(mode)); err != nil { return err } if filter != "" { if err = writer.WriteField("filter", filter); err != nil { return err } } part, err := writer.CreateFormFile("file", fileHeader.Filename) if err != nil { return err } if _, err = io.Copy(part, f); err != nil { return err } if err = writer.Close(); err != nil { return err } u := url.URL{ Scheme: "http", Host: seedPeerHost, Path: filepath.Join("buckets", bucketName, "objects", objectKey), } req, err := http.NewRequestWithContext(ctx, http.MethodPut, u.String(), body) if err != nil { return err } req.Header.Add(headers.ContentType, writer.FormDataContentType()) resp, err := http.DefaultClient.Do(req) if err != nil { return err } defer resp.Body.Close() if resp.StatusCode/100 != 2 { return fmt.Errorf("bad response status %s", resp.Status) } return nil }