From ec2a72d8726a91152aa26f07516ecdbfb3a85c50 Mon Sep 17 00:00:00 2001 From: Gaius Date: Mon, 27 Jun 2022 20:21:41 +0800 Subject: [PATCH] feat: import object to seed peer with max replicas (#1413) * feat: create object storage with max replicas Signed-off-by: Gaius * feat: import object storage with max replicas Signed-off-by: Gaius * feat: add strings.Unique func Signed-off-by: Gaius --- client/config/peerhost.go | 8 ++++ client/config/peerhost_darwin.go | 5 ++- client/config/peerhost_linux.go | 5 ++- client/config/peerhost_test.go | 5 ++- client/config/testdata/config/daemon.yaml | 1 + client/daemon/objectstorage/objectstorage.go | 41 +++++++++++++------- client/daemon/objectstorage/types.go | 9 +++-- pkg/strings/strings.go | 14 +++++++ pkg/strings/strings_test.go | 8 ++++ 9 files changed, 73 insertions(+), 23 deletions(-) diff --git a/client/config/peerhost.go b/client/config/peerhost.go index b2c21f8b7..a1046b980 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -139,6 +139,12 @@ func (p *DaemonOption) Validate() error { return errors.Errorf("rate limit must be greater than %s", DefaultMinRate.String()) } + if p.ObjectStorage.Enable { + if p.ObjectStorage.MaxReplicas <= 0 { + return errors.New("max replicas must be greater than 0") + } + } + switch p.Download.DefaultPattern { case PatternP2P, PatternSeedPeer, PatternSource: default: @@ -370,6 +376,8 @@ type ObjectStorageOption struct { // filtering unnecessary query params in the URL, // it is separated by & character. Filter string `mapstructure:"filter" yaml:"filter"` + // MaxReplicas is the maximum number of replicas of an object cache in seed peers. + MaxReplicas int `mapstructure:"maxReplicas" yaml:"maxReplicas"` // ListenOption is object storage service listener. ListenOption `yaml:",inline" mapstructure:",squash"` } diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index 38f73ff8f..216cbd4c5 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -119,8 +119,9 @@ var peerHostConfig = DaemonOption{ }, }, ObjectStorage: ObjectStorageOption{ - Enable: false, - Filter: "Expires&Signature&ns", + Enable: false, + Filter: "Expires&Signature&ns", + MaxReplicas: 3, ListenOption: ListenOption{ Security: SecurityOption{ Insecure: true, diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index dbca09616..cd8f2e1a0 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -118,8 +118,9 @@ var peerHostConfig = DaemonOption{ }, }, ObjectStorage: ObjectStorageOption{ - Enable: false, - Filter: "Expires&Signature&ns", + Enable: false, + Filter: "Expires&Signature&ns", + MaxReplicas: 3, ListenOption: ListenOption{ Security: SecurityOption{ Insecure: true, diff --git a/client/config/peerhost_test.go b/client/config/peerhost_test.go index b1b68dd5e..f8d2b6546 100644 --- a/client/config/peerhost_test.go +++ b/client/config/peerhost_test.go @@ -327,8 +327,9 @@ func TestPeerHostOption_Load(t *testing.T) { }, }, ObjectStorage: ObjectStorageOption{ - Enable: true, - Filter: "Expires&Signature&ns", + Enable: true, + Filter: "Expires&Signature&ns", + MaxReplicas: 3, ListenOption: ListenOption{ Security: SecurityOption{ Insecure: true, diff --git a/client/config/testdata/config/daemon.yaml b/client/config/testdata/config/daemon.yaml index aa5afd8f6..cbd9f16c2 100644 --- a/client/config/testdata/config/daemon.yaml +++ b/client/config/testdata/config/daemon.yaml @@ -73,6 +73,7 @@ upload: objectStorage: enable: true filter: Expires&Signature&ns + maxReplicas: 3 security: insecure: true caCert: caCert diff --git a/client/daemon/objectstorage/objectstorage.go b/client/daemon/objectstorage/objectstorage.go index 3dc432d3a..e5aff3ee2 100644 --- a/client/daemon/objectstorage/objectstorage.go +++ b/client/daemon/objectstorage/objectstorage.go @@ -47,6 +47,7 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/objectstorage" "d7y.io/dragonfly/v2/pkg/rpc/base" + pkgstrings "d7y.io/dragonfly/v2/pkg/strings" ) const ( @@ -307,11 +308,12 @@ func (o *objectStorage) createObject(ctx *gin.Context) { } var ( - bucketName = params.ID - objectKey = form.Key - mode = form.Mode - filter = form.Filter - fileHeader = form.File + bucketName = params.ID + objectKey = form.Key + mode = form.Mode + filter = form.Filter + maxReplicas = form.MaxReplicas + fileHeader = form.File ) client, err := o.client() @@ -346,6 +348,11 @@ func (o *objectStorage) createObject(ctx *gin.Context) { urlMeta.Filter = filter } + // Initialize max replicas. + if maxReplicas == 0 { + maxReplicas = o.config.ObjectStorage.MaxReplicas + } + // Initialize task id and peer id. taskID := idgen.TaskID(signURL, urlMeta) peerID := o.peerIDGenerator.PeerID() @@ -380,9 +387,8 @@ func (o *objectStorage) createObject(ctx *gin.Context) { case WriteBack: // Import object to seed peer. go func() { - log.Infof("import object %s to seed peer", objectKey) - if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil { - log.Errorf("import object %s to seed peer failed: %s", objectKey, err) + if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil { + log.Errorf("import object %s to seed peers failed: %s", objectKey, err) } }() @@ -399,9 +405,8 @@ func (o *objectStorage) createObject(ctx *gin.Context) { case AsyncWriteBack: // Import object to seed peer. go func() { - log.Infof("import object %s to seed peer", objectKey) - if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil { - log.Errorf("import object %s to seed peer failed: %s", objectKey, err) + if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil { + log.Errorf("import object %s to seed peers failed: %s", objectKey, err) } }() @@ -477,7 +482,7 @@ func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID, } // importObjectToSeedPeers uses to import object to available seed peers. -func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader) error { +func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader, maxReplicas int, log *logger.SugaredLoggerOnWith) error { schedulers, err := o.dynconfig.GetSchedulers() if err != nil { return err @@ -491,13 +496,23 @@ func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, } } } + seedPeerHosts = pkgstrings.Unique(seedPeerHosts) + var replicas int for _, seedPeerHost := range seedPeerHosts { + log.Infof("import object %s to seed peer %s", objectKey, seedPeerHost) if err := o.importObjectToSeedPeer(ctx, seedPeerHost, bucketName, objectKey, mode, fileHeader); err != nil { - return err + log.Errorf("import object %s to seed peer %s failed: %s", objectKey, seedPeerHost, err) + continue + } + + replicas++ + if replicas >= maxReplicas { + break } } + log.Infof("import %d object %s to seed peers", replicas, objectKey) return nil } diff --git a/client/daemon/objectstorage/types.go b/client/daemon/objectstorage/types.go index ef5df3657..7a05b71e0 100644 --- a/client/daemon/objectstorage/types.go +++ b/client/daemon/objectstorage/types.go @@ -28,10 +28,11 @@ type CreateObjectParams struct { } type CreateObjectRequset struct { - Key string `form:"key" binding:"required"` - Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"` - Filter string `form:"filter" binding:"omitempty"` - File *multipart.FileHeader `form:"file" binding:"required"` + Key string `form:"key" binding:"required"` + Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"` + Filter string `form:"filter" binding:"omitempty"` + MaxReplicas int `form:"maxReplicas" binding:"omitempty,gt=0,lte=100"` + File *multipart.FileHeader `form:"file" binding:"required"` } type GetObjectQuery struct { diff --git a/pkg/strings/strings.go b/pkg/strings/strings.go index a7ae13b84..884f198ef 100644 --- a/pkg/strings/strings.go +++ b/pkg/strings/strings.go @@ -35,3 +35,17 @@ func Contains(slice []string, ele string) bool { return false } + +// Remove the duplicate elements in the string slice +func Unique(slice []string) []string { + keys := make(map[string]bool) + result := []string{} + for _, entry := range slice { + if _, ok := keys[entry]; !ok { + keys[entry] = true + result = append(result, entry) + } + } + + return result +} diff --git a/pkg/strings/strings_test.go b/pkg/strings/strings_test.go index 0d1edadce..6b59036fa 100644 --- a/pkg/strings/strings_test.go +++ b/pkg/strings/strings_test.go @@ -32,3 +32,11 @@ func TestContains(t *testing.T) { assert.True(t, Contains([]string{"a", "B"}, "B")) assert.False(t, Contains([]string{"a", "B"}, "b")) } + +func TestUnique(t *testing.T) { + assert.EqualValues(t, Unique([]string{"a", "B"}), []string{"a", "B"}) + assert.EqualValues(t, Unique([]string{"a", "a", "B", "B"}), []string{"a", "B"}) + assert.EqualValues(t, Unique([]string{"a", "B", "a", "B"}), []string{"a", "B"}) + assert.EqualValues(t, Unique([]string{}), []string{}) + assert.EqualValues(t, Unique([]string{}), []string{}) +}