feat: import object to seed peer with max replicas (#1413)

* feat: create object storage with max replicas

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: import object storage with max replicas

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: add strings.Unique func

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-06-27 20:21:41 +08:00
parent 4ad950a8e3
commit ec2a72d872
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
9 changed files with 73 additions and 23 deletions

View File

@ -139,6 +139,12 @@ func (p *DaemonOption) Validate() error {
return errors.Errorf("rate limit must be greater than %s", DefaultMinRate.String())
}
if p.ObjectStorage.Enable {
if p.ObjectStorage.MaxReplicas <= 0 {
return errors.New("max replicas must be greater than 0")
}
}
switch p.Download.DefaultPattern {
case PatternP2P, PatternSeedPeer, PatternSource:
default:
@ -370,6 +376,8 @@ type ObjectStorageOption struct {
// filtering unnecessary query params in the URL,
// it is separated by & character.
Filter string `mapstructure:"filter" yaml:"filter"`
// MaxReplicas is the maximum number of replicas of an object cache in seed peers.
MaxReplicas int `mapstructure:"maxReplicas" yaml:"maxReplicas"`
// ListenOption is object storage service listener.
ListenOption `yaml:",inline" mapstructure:",squash"`
}

View File

@ -119,8 +119,9 @@ var peerHostConfig = DaemonOption{
},
},
ObjectStorage: ObjectStorageOption{
Enable: false,
Filter: "Expires&Signature&ns",
Enable: false,
Filter: "Expires&Signature&ns",
MaxReplicas: 3,
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,

View File

@ -118,8 +118,9 @@ var peerHostConfig = DaemonOption{
},
},
ObjectStorage: ObjectStorageOption{
Enable: false,
Filter: "Expires&Signature&ns",
Enable: false,
Filter: "Expires&Signature&ns",
MaxReplicas: 3,
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,

View File

@ -327,8 +327,9 @@ func TestPeerHostOption_Load(t *testing.T) {
},
},
ObjectStorage: ObjectStorageOption{
Enable: true,
Filter: "Expires&Signature&ns",
Enable: true,
Filter: "Expires&Signature&ns",
MaxReplicas: 3,
ListenOption: ListenOption{
Security: SecurityOption{
Insecure: true,

View File

@ -73,6 +73,7 @@ upload:
objectStorage:
enable: true
filter: Expires&Signature&ns
maxReplicas: 3
security:
insecure: true
caCert: caCert

View File

@ -47,6 +47,7 @@ import (
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/objectstorage"
"d7y.io/dragonfly/v2/pkg/rpc/base"
pkgstrings "d7y.io/dragonfly/v2/pkg/strings"
)
const (
@ -307,11 +308,12 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
}
var (
bucketName = params.ID
objectKey = form.Key
mode = form.Mode
filter = form.Filter
fileHeader = form.File
bucketName = params.ID
objectKey = form.Key
mode = form.Mode
filter = form.Filter
maxReplicas = form.MaxReplicas
fileHeader = form.File
)
client, err := o.client()
@ -346,6 +348,11 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
urlMeta.Filter = filter
}
// Initialize max replicas.
if maxReplicas == 0 {
maxReplicas = o.config.ObjectStorage.MaxReplicas
}
// Initialize task id and peer id.
taskID := idgen.TaskID(signURL, urlMeta)
peerID := o.peerIDGenerator.PeerID()
@ -380,9 +387,8 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
case WriteBack:
// Import object to seed peer.
go func() {
log.Infof("import object %s to seed peer", objectKey)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil {
log.Errorf("import object %s to seed peer failed: %s", objectKey, err)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil {
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
}
}()
@ -399,9 +405,8 @@ func (o *objectStorage) createObject(ctx *gin.Context) {
case AsyncWriteBack:
// Import object to seed peer.
go func() {
log.Infof("import object %s to seed peer", objectKey)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader); err != nil {
log.Errorf("import object %s to seed peer failed: %s", objectKey, err)
if err := o.importObjectToSeedPeers(context.Background(), bucketName, objectKey, Ephemeral, fileHeader, maxReplicas, log); err != nil {
log.Errorf("import object %s to seed peers failed: %s", objectKey, err)
}
}()
@ -477,7 +482,7 @@ func (o *objectStorage) importObjectToLocalStorage(ctx context.Context, taskID,
}
// importObjectToSeedPeers uses to import object to available seed peers.
func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader) error {
func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName, objectKey string, mode int, fileHeader *multipart.FileHeader, maxReplicas int, log *logger.SugaredLoggerOnWith) error {
schedulers, err := o.dynconfig.GetSchedulers()
if err != nil {
return err
@ -491,13 +496,23 @@ func (o *objectStorage) importObjectToSeedPeers(ctx context.Context, bucketName,
}
}
}
seedPeerHosts = pkgstrings.Unique(seedPeerHosts)
var replicas int
for _, seedPeerHost := range seedPeerHosts {
log.Infof("import object %s to seed peer %s", objectKey, seedPeerHost)
if err := o.importObjectToSeedPeer(ctx, seedPeerHost, bucketName, objectKey, mode, fileHeader); err != nil {
return err
log.Errorf("import object %s to seed peer %s failed: %s", objectKey, seedPeerHost, err)
continue
}
replicas++
if replicas >= maxReplicas {
break
}
}
log.Infof("import %d object %s to seed peers", replicas, objectKey)
return nil
}

View File

@ -28,10 +28,11 @@ type CreateObjectParams struct {
}
type CreateObjectRequset struct {
Key string `form:"key" binding:"required"`
Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"`
Filter string `form:"filter" binding:"omitempty"`
File *multipart.FileHeader `form:"file" binding:"required"`
Key string `form:"key" binding:"required"`
Mode uint `form:"mode,default=0" binding:"omitempty,gte=0,lte=2"`
Filter string `form:"filter" binding:"omitempty"`
MaxReplicas int `form:"maxReplicas" binding:"omitempty,gt=0,lte=100"`
File *multipart.FileHeader `form:"file" binding:"required"`
}
type GetObjectQuery struct {

View File

@ -35,3 +35,17 @@ func Contains(slice []string, ele string) bool {
return false
}
// Remove the duplicate elements in the string slice
func Unique(slice []string) []string {
keys := make(map[string]bool)
result := []string{}
for _, entry := range slice {
if _, ok := keys[entry]; !ok {
keys[entry] = true
result = append(result, entry)
}
}
return result
}

View File

@ -32,3 +32,11 @@ func TestContains(t *testing.T) {
assert.True(t, Contains([]string{"a", "B"}, "B"))
assert.False(t, Contains([]string{"a", "B"}, "b"))
}
func TestUnique(t *testing.T) {
assert.EqualValues(t, Unique([]string{"a", "B"}), []string{"a", "B"})
assert.EqualValues(t, Unique([]string{"a", "a", "B", "B"}), []string{"a", "B"})
assert.EqualValues(t, Unique([]string{"a", "B", "a", "B"}), []string{"a", "B"})
assert.EqualValues(t, Unique([]string{}), []string{})
assert.EqualValues(t, Unique([]string{}), []string{})
}