From 3af4cc7a0f8a8c953c8735c23697c2e1c926e23f Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 24 Nov 2022 14:21:22 +0800 Subject: [PATCH] feat: scheduler supports storage config (#1864) Signed-off-by: Gaius --- deploy/helm-charts | 2 +- scheduler/config/config.go | 7 +- scheduler/config/constants.go | 11 ++ scheduler/scheduler.go | 7 +- scheduler/storage/storage.go | 51 +----- scheduler/storage/storage_test.go | 261 ++++++++++++------------------ 6 files changed, 127 insertions(+), 212 deletions(-) diff --git a/deploy/helm-charts b/deploy/helm-charts index b824a28e7..91870893b 160000 --- a/deploy/helm-charts +++ b/deploy/helm-charts @@ -1 +1 @@ -Subproject commit b824a28e704776f0675409ac5dc997bb436acae5 +Subproject commit 91870893b4df549f0666bb35e280285be571c27c diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 1b2ee5363..66957bde7 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -27,7 +27,6 @@ import ( "d7y.io/dragonfly/v2/pkg/net/ip" "d7y.io/dragonfly/v2/pkg/rpc" "d7y.io/dragonfly/v2/pkg/types" - "d7y.io/dragonfly/v2/scheduler/storage" ) type Config struct { @@ -335,9 +334,9 @@ func New() *Config { }, }, Storage: StorageConfig{ - MaxSize: storage.DefaultMaxSize, - MaxBackups: storage.DefaultMaxBackups, - BufferSize: storage.DefaultBufferSize, + MaxSize: DefaultStorageMaxSize, + MaxBackups: DefaultStorageMaxBackups, + BufferSize: DefaultStorageBufferSize, }, Metrics: MetricsConfig{ Enable: false, diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index d0f49f7e6..2d4de0dd2 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -121,3 +121,14 @@ var ( // DefaultNetworkEnableIPv6 is default value of enableIPv6. DefaultNetworkEnableIPv6 = false ) + +const ( + // DefaultStorageMaxSize is the default maximum size of record file. + DefaultStorageMaxSize = 100 + + // DefaultStorageMaxBackups is the default maximum count of backup. + DefaultStorageMaxBackups = 10 + + // DefaultStorageBufferSize is the default size of buffer container. + DefaultStorageBufferSize = 100 +) diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 6c2851383..a1245d3b2 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -176,7 +176,12 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err scheduler := scheduler.New(&cfg.Scheduler, dynconfig, d.PluginDir()) // Initialize Storage. - storage, err := storage.New(d.DataDir()) + storage, err := storage.New( + d.DataDir(), + cfg.Storage.MaxSize, + cfg.Storage.MaxBackups, + cfg.Storage.BufferSize, + ) if err != nil { return nil, err } diff --git a/scheduler/storage/storage.go b/scheduler/storage/storage.go index b847361cb..80b780d84 100644 --- a/scheduler/storage/storage.go +++ b/scheduler/storage/storage.go @@ -37,17 +37,6 @@ import ( pkgio "d7y.io/dragonfly/v2/pkg/io" ) -const ( - // DefaultMaxSize is the default maximum size of record file. - DefaultMaxSize = 100 - - // DefaultMaxBackups is the default maximum count of backup. - DefaultMaxBackups = 10 - - // DefaultBufferSize is the default size of buffer container. - DefaultBufferSize = 100 -) - const ( // RecordFilePrefix is prefix of record file name. RecordFilePrefix = "record" @@ -108,48 +97,18 @@ type storage struct { mu *sync.RWMutex } -// Option is a functional option for configuring the Storage. -type Option func(s *storage) - -// WithMaxSize sets the maximum size in megabytes of storage file. -func WithMaxSize(maxSize int) Option { - return func(s *storage) { - s.maxSize = int64(maxSize * megabyte) - } -} - -// WithMaxBackups sets the maximum number of storage files to retain. -func WithMaxBackups(maxBackups int) Option { - return func(s *storage) { - s.maxBackups = maxBackups - } -} - -// WithCacheSize sets the size of buffer container, -// if the buffer is full, write all the records in the buffer to the file. -func WithBufferSize(bufferSize int) Option { - return func(s *storage) { - s.bufferSize = bufferSize - s.buffer = make([]Record, 0, bufferSize) - } -} - // New returns a new Storage instence. -func New(baseDir string, options ...Option) (Storage, error) { +func New(baseDir string, maxSize, maxBackups, bufferSize int) (Storage, error) { s := &storage{ baseDir: baseDir, filename: filepath.Join(baseDir, fmt.Sprintf("%s.%s", RecordFilePrefix, RecordFileExt)), - maxSize: DefaultMaxSize * megabyte, - maxBackups: DefaultMaxBackups, - buffer: make([]Record, 0, DefaultBufferSize), - bufferSize: DefaultBufferSize, + maxSize: int64(maxSize * megabyte), + maxBackups: maxBackups, + buffer: make([]Record, 0, bufferSize), + bufferSize: bufferSize, mu: &sync.RWMutex{}, } - for _, opt := range options { - opt(s) - } - file, err := os.OpenFile(s.filename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { return nil, err diff --git a/scheduler/storage/storage_test.go b/scheduler/storage/storage_test.go index 8a0009c56..7bf8b478d 100644 --- a/scheduler/storage/storage_test.go +++ b/scheduler/storage/storage_test.go @@ -29,6 +29,8 @@ import ( "github.com/gocarina/gocsv" "github.com/stretchr/testify/assert" + + "d7y.io/dragonfly/v2/scheduler/config" ) var ( @@ -146,21 +148,19 @@ func TestStorage_New(t *testing.T) { tests := []struct { name string baseDir string - options []Option expect func(t *testing.T, s Storage, err error) }{ { name: "new storage", baseDir: os.TempDir(), - options: []Option{}, expect: func(t *testing.T, s Storage, err error) { assert := assert.New(t) assert.NoError(err) assert.Equal(reflect.TypeOf(s).Elem().Name(), "storage") - assert.Equal(s.(*storage).maxSize, int64(DefaultMaxSize*megabyte)) - assert.Equal(s.(*storage).maxBackups, DefaultMaxBackups) - assert.Equal(s.(*storage).bufferSize, DefaultBufferSize) - assert.Equal(cap(s.(*storage).buffer), DefaultBufferSize) + assert.Equal(s.(*storage).maxSize, int64(config.DefaultStorageMaxSize*megabyte)) + assert.Equal(s.(*storage).maxBackups, config.DefaultStorageMaxBackups) + assert.Equal(s.(*storage).bufferSize, config.DefaultStorageBufferSize) + assert.Equal(cap(s.(*storage).buffer), config.DefaultStorageBufferSize) assert.Equal(len(s.(*storage).buffer), 0) assert.Equal(s.(*storage).count, int64(0)) @@ -169,68 +169,9 @@ func TestStorage_New(t *testing.T) { } }, }, - { - name: "new storage with maxSize", - baseDir: os.TempDir(), - options: []Option{WithMaxSize(1)}, - expect: func(t *testing.T, s Storage, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.Equal(reflect.TypeOf(s).Elem().Name(), "storage") - assert.Equal(s.(*storage).maxSize, int64(1*megabyte)) - assert.Equal(s.(*storage).maxBackups, DefaultMaxBackups) - assert.Equal(s.(*storage).bufferSize, DefaultBufferSize) - assert.Equal(cap(s.(*storage).buffer), DefaultBufferSize) - assert.Equal(len(s.(*storage).buffer), 0) - - if err := s.Clear(); err != nil { - t.Fatal(err) - } - }, - }, - { - name: "new storage with maxBackups", - baseDir: os.TempDir(), - options: []Option{WithMaxBackups(1)}, - expect: func(t *testing.T, s Storage, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.Equal(reflect.TypeOf(s).Elem().Name(), "storage") - assert.Equal(s.(*storage).maxSize, int64(DefaultMaxSize*megabyte)) - assert.Equal(s.(*storage).maxBackups, 1) - assert.Equal(s.(*storage).bufferSize, DefaultBufferSize) - assert.Equal(cap(s.(*storage).buffer), DefaultBufferSize) - assert.Equal(len(s.(*storage).buffer), 0) - - if err := s.Clear(); err != nil { - t.Fatal(err) - } - }, - }, - { - name: "new storage with bufferSize", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, - expect: func(t *testing.T, s Storage, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.Equal(reflect.TypeOf(s).Elem().Name(), "storage") - assert.Equal(s.(*storage).maxSize, int64(DefaultMaxSize*megabyte)) - assert.Equal(s.(*storage).maxBackups, DefaultMaxBackups) - assert.Equal(s.(*storage).bufferSize, 1) - assert.Equal(cap(s.(*storage).buffer), 1) - assert.Equal(len(s.(*storage).buffer), 0) - assert.Equal(len(s.(*storage).buffer), 0) - - if err := s.Clear(); err != nil { - t.Fatal(err) - } - }, - }, { name: "new storage failed", baseDir: "/foo", - options: []Option{WithMaxBackups(100)}, expect: func(t *testing.T, s Storage, err error) { assert := assert.New(t) assert.Error(err) @@ -240,7 +181,7 @@ func TestStorage_New(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) tc.expect(t, s, err) }) } @@ -248,17 +189,17 @@ func TestStorage_New(t *testing.T) { func TestStorage_Create(t *testing.T) { tests := []struct { - name string - baseDir string - options []Option - mock func(s Storage) - expect func(t *testing.T, s Storage, baseDir string) + name string + baseDir string + bufferSize int + mock func(s Storage) + expect func(t *testing.T, s Storage, baseDir string) }{ { - name: "create record", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, - mock: func(s Storage) {}, + name: "create record", + baseDir: os.TempDir(), + bufferSize: 1, + mock: func(s Storage) {}, expect: func(t *testing.T, s Storage, baseDir string) { assert := assert.New(t) err := s.Create(Record{}) @@ -267,9 +208,9 @@ func TestStorage_Create(t *testing.T) { }, }, { - name: "create record without buffer", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(0)}, + name: "create record without buffer", + baseDir: os.TempDir(), + bufferSize: 0, mock: func(s Storage) { }, expect: func(t *testing.T, s Storage, baseDir string) { @@ -280,9 +221,9 @@ func TestStorage_Create(t *testing.T) { }, }, { - name: "write record to file", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, + name: "write record to file", + baseDir: os.TempDir(), + bufferSize: 1, mock: func(s Storage) { }, expect: func(t *testing.T, s Storage, baseDir string) { @@ -295,9 +236,9 @@ func TestStorage_Create(t *testing.T) { }, }, { - name: "open file failed", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(0)}, + name: "open file failed", + baseDir: os.TempDir(), + bufferSize: 0, mock: func(s Storage) { s.(*storage).baseDir = "foo" }, @@ -312,7 +253,7 @@ func TestStorage_Create(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize) if err != nil { t.Fatal(err) } @@ -328,18 +269,18 @@ func TestStorage_Create(t *testing.T) { func TestStorage_List(t *testing.T) { tests := []struct { - name string - baseDir string - options []Option - record Record - mock func(t *testing.T, s Storage, baseDir string, record Record) - expect func(t *testing.T, s Storage, baseDir string, record Record) + name string + baseDir string + bufferSize int + record Record + mock func(t *testing.T, s Storage, baseDir string, record Record) + expect func(t *testing.T, s Storage, baseDir string, record Record) }{ { - name: "empty csv file given", - baseDir: os.TempDir(), - options: []Option{}, - mock: func(t *testing.T, s Storage, baseDir string, record Record) {}, + name: "empty csv file given", + baseDir: os.TempDir(), + bufferSize: config.DefaultStorageBufferSize, + mock: func(t *testing.T, s Storage, baseDir string, record Record) {}, expect: func(t *testing.T, s Storage, baseDir string, record Record) { assert := assert.New(t) _, err := s.List() @@ -347,9 +288,9 @@ func TestStorage_List(t *testing.T) { }, }, { - name: "get file infos failed", - baseDir: os.TempDir(), - options: []Option{}, + name: "get file infos failed", + baseDir: os.TempDir(), + bufferSize: config.DefaultStorageBufferSize, mock: func(t *testing.T, s Storage, baseDir string, record Record) { s.(*storage).baseDir = "bae" }, @@ -361,9 +302,9 @@ func TestStorage_List(t *testing.T) { }, }, { - name: "open file failed", - baseDir: os.TempDir(), - options: []Option{}, + name: "open file failed", + baseDir: os.TempDir(), + bufferSize: config.DefaultStorageBufferSize, mock: func(t *testing.T, s Storage, baseDir string, record Record) { file, err := os.OpenFile(filepath.Join(baseDir, "record-test.csv"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0300) if err != nil { @@ -378,10 +319,10 @@ func TestStorage_List(t *testing.T) { }, }, { - name: "list records of a file", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, - record: mockRecord, + name: "list records of a file", + baseDir: os.TempDir(), + bufferSize: 1, + record: mockRecord, mock: func(t *testing.T, s Storage, baseDir string, record Record) { if err := s.Create(record); err != nil { t.Fatal(err) @@ -402,10 +343,10 @@ func TestStorage_List(t *testing.T) { }, }, { - name: "list records of multi files", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, - record: Record{}, + name: "list records of multi files", + baseDir: os.TempDir(), + bufferSize: 1, + record: Record{}, mock: func(t *testing.T, s Storage, baseDir string, record Record) { file, err := os.OpenFile(filepath.Join(baseDir, "record-test.csv"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { @@ -438,7 +379,7 @@ func TestStorage_List(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize) if err != nil { t.Fatal(err) } @@ -454,18 +395,18 @@ func TestStorage_List(t *testing.T) { func TestStorage_Open(t *testing.T) { tests := []struct { - name string - baseDir string - options []Option - record Record - mock func(t *testing.T, s Storage, baseDir string, record Record) - expect func(t *testing.T, s Storage, baseDir string, record Record) + name string + baseDir string + bufferSize int + record Record + mock func(t *testing.T, s Storage, baseDir string, record Record) + expect func(t *testing.T, s Storage, baseDir string, record Record) }{ { - name: "open storage withempty csv file given", - baseDir: os.TempDir(), - options: []Option{}, - mock: func(t *testing.T, s Storage, baseDir string, record Record) {}, + name: "open storage withempty csv file given", + baseDir: os.TempDir(), + bufferSize: config.DefaultStorageBufferSize, + mock: func(t *testing.T, s Storage, baseDir string, record Record) {}, expect: func(t *testing.T, s Storage, baseDir string, record Record) { assert := assert.New(t) _, err := s.Open() @@ -473,9 +414,9 @@ func TestStorage_Open(t *testing.T) { }, }, { - name: "open file infos failed", - baseDir: os.TempDir(), - options: []Option{}, + name: "open file infos failed", + baseDir: os.TempDir(), + bufferSize: config.DefaultStorageBufferSize, mock: func(t *testing.T, s Storage, baseDir string, record Record) { s.(*storage).baseDir = "bas" }, @@ -487,10 +428,10 @@ func TestStorage_Open(t *testing.T) { }, }, { - name: "open storage with records of a file", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, - record: mockRecord, + name: "open storage with records of a file", + baseDir: os.TempDir(), + bufferSize: 1, + record: mockRecord, mock: func(t *testing.T, s Storage, baseDir string, record Record) { if err := s.Create(record); err != nil { t.Fatal(err) @@ -516,10 +457,10 @@ func TestStorage_Open(t *testing.T) { }, }, { - name: "open storage with records of multi files", - baseDir: os.TempDir(), - options: []Option{WithBufferSize(1)}, - record: Record{}, + name: "open storage with records of multi files", + baseDir: os.TempDir(), + bufferSize: 1, + record: Record{}, mock: func(t *testing.T, s Storage, baseDir string, record Record) { file, err := os.OpenFile(filepath.Join(baseDir, "record-test.csv"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) if err != nil { @@ -556,7 +497,7 @@ func TestStorage_Open(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize) if err != nil { t.Fatal(err) } @@ -574,14 +515,12 @@ func TestStorage_Clear(t *testing.T) { tests := []struct { name string baseDir string - options []Option mock func(s Storage) expect func(t *testing.T, s Storage, baseDir string) }{ { name: "clear file", baseDir: os.TempDir(), - options: []Option{}, mock: func(s Storage) {}, expect: func(t *testing.T, s Storage, baseDir string) { assert := assert.New(t) @@ -602,7 +541,6 @@ func TestStorage_Clear(t *testing.T) { { name: "open file failed", baseDir: os.TempDir(), - options: []Option{}, mock: func(s Storage) { s.(*storage).baseDir = "baz" }, @@ -618,7 +556,7 @@ func TestStorage_Clear(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) if err != nil { t.Fatal(err) } @@ -633,14 +571,12 @@ func TestStorage_create(t *testing.T) { tests := []struct { name string baseDir string - options []Option mock func(s Storage) expect func(t *testing.T, s Storage, baseDir string) }{ { name: "create record", baseDir: os.TempDir(), - options: []Option{}, mock: func(s Storage) {}, expect: func(t *testing.T, s Storage, baseDir string) { assert := assert.New(t) @@ -651,7 +587,6 @@ func TestStorage_create(t *testing.T) { { name: "open file failed", baseDir: os.TempDir(), - options: []Option{}, mock: func(s Storage) { s.(*storage).baseDir = "foo" }, @@ -666,7 +601,7 @@ func TestStorage_create(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) if err != nil { t.Fatal(err) } @@ -682,16 +617,21 @@ func TestStorage_create(t *testing.T) { func TestStorage_openFile(t *testing.T) { tests := []struct { - name string - baseDir string - options []Option - mock func(t *testing.T, s Storage) - expect func(t *testing.T, s Storage, baseDir string) + name string + baseDir string + maxSize int + maxBackups int + + bufferSize int + mock func(t *testing.T, s Storage) + expect func(t *testing.T, s Storage, baseDir string) }{ { - name: "open file failed", - baseDir: os.TempDir(), - options: []Option{}, + name: "open file failed", + baseDir: os.TempDir(), + maxSize: config.DefaultStorageMaxSize, + maxBackups: config.DefaultStorageMaxBackups, + bufferSize: config.DefaultStorageBufferSize, mock: func(t *testing.T, s Storage) { s.(*storage).baseDir = "bat" }, @@ -703,9 +643,11 @@ func TestStorage_openFile(t *testing.T) { }, }, { - name: "open new record file", - baseDir: os.TempDir(), - options: []Option{WithMaxSize(0), WithBufferSize(1)}, + name: "open new record file", + baseDir: os.TempDir(), + maxSize: 0, + maxBackups: config.DefaultStorageMaxBackups, + bufferSize: 1, mock: func(t *testing.T, s Storage) { if err := s.Create(Record{ID: "1"}); err != nil { t.Fatal(err) @@ -724,9 +666,11 @@ func TestStorage_openFile(t *testing.T) { }, }, { - name: "remove record file", - baseDir: os.TempDir(), - options: []Option{WithMaxSize(0), WithMaxBackups(1), WithBufferSize(1)}, + name: "remove record file", + baseDir: os.TempDir(), + maxSize: 0, + maxBackups: 1, + bufferSize: 1, mock: func(t *testing.T, s Storage) { if err := s.Create(Record{ID: "1"}); err != nil { t.Fatal(err) @@ -744,7 +688,7 @@ func TestStorage_openFile(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, tc.maxSize, tc.maxBackups, tc.bufferSize) if err != nil { t.Fatal(err) } @@ -760,7 +704,7 @@ func TestStorage_openFile(t *testing.T) { func TestStorage_backupFilename(t *testing.T) { baseDir := os.TempDir() - s, err := New(baseDir) + s, err := New(baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) if err != nil { t.Fatal(err) } @@ -779,14 +723,12 @@ func TestStorage_backups(t *testing.T) { tests := []struct { name string baseDir string - options []Option mock func(t *testing.T, s Storage) expect func(t *testing.T, s Storage, baseDir string) }{ { name: "open file failed", baseDir: os.TempDir(), - options: []Option{}, mock: func(t *testing.T, s Storage) { s.(*storage).baseDir = "bar" }, @@ -803,7 +745,6 @@ func TestStorage_backups(t *testing.T) { { name: "not found record file", baseDir: os.TempDir(), - options: []Option{}, mock: func(t *testing.T, s Storage) {}, expect: func(t *testing.T, s Storage, baseDir string) { assert := assert.New(t) @@ -819,7 +760,7 @@ func TestStorage_backups(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.options...) + s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) if err != nil { t.Fatal(err) }