From c10b6da49821604059e23b54267ceddf1ea8633e Mon Sep 17 00:00:00 2001 From: Eryu Guan <45746212+eryugey@users.noreply.github.com> Date: Wed, 22 Feb 2023 10:36:23 +0800 Subject: [PATCH] feat: support reload scheduler addresses for local Dynconfig (#2091) Dynconfig only supports refresh new configs from manager, but local dynconfig is unable to reload scheduler addresses when local config file is updated. Now we introduce new Dynconfig method 'SetConfig()', which sets the given DaemonOption to struct dynconfigLocal, and register it as a watcher of WatchConfig(), which will reload config file periodically. So that dynconfigLocal.GetResolveSchedulerAddrs() will get new scheduler addresses. Signed-off-by: Eryu Guan --- client/config/dynconfig.go | 6 ++++ client/config/dynconfig_local.go | 10 ++++++ client/config/dynconfig_local_test.go | 45 ++++++++++++++++++++++++--- client/config/dynconfig_manager.go | 10 ++++++ client/daemon/daemon.go | 4 +++ 5 files changed, 70 insertions(+), 5 deletions(-) diff --git a/client/config/dynconfig.go b/client/config/dynconfig.go index e0e4750ad..5d44285f9 100644 --- a/client/config/dynconfig.go +++ b/client/config/dynconfig.go @@ -61,9 +61,15 @@ type Dynconfig interface { // Get the dynamic config. Get() (*DynconfigData, error) + // Get the dynamic config source type. + GetSourceType() SourceType + // Refresh refreshes dynconfig in cache. Refresh() error + // SetConfig updates DaemonOption in dynconfig. + SetConfig(*DaemonOption) + // Register allows an instance to register itself to listen/observe events. Register(Observer) diff --git a/client/config/dynconfig_local.go b/client/config/dynconfig_local.go index f80e482ef..b1697d79d 100644 --- a/client/config/dynconfig_local.go +++ b/client/config/dynconfig_local.go @@ -100,11 +100,21 @@ func (d *dynconfigLocal) Get() (*DynconfigData, error) { return nil, ErrUnimplemented } +// Get the dynamic config source type. +func (d *dynconfigLocal) GetSourceType() SourceType { + return LocalSourceType +} + // Refresh refreshes dynconfig in cache. func (d *dynconfigLocal) Refresh() error { return nil } +// SetConfig updates DaemonOption in dynconfig. +func (d *dynconfigLocal) SetConfig(newcfg *DaemonOption) { + d.config = newcfg +} + // Register allows an instance to register itself to listen/observe events. func (d *dynconfigLocal) Register(l Observer) { d.observers[l] = struct{}{} diff --git a/client/config/dynconfig_local_test.go b/client/config/dynconfig_local_test.go index 47ee6e5db..e081879e2 100644 --- a/client/config/dynconfig_local_test.go +++ b/client/config/dynconfig_local_test.go @@ -36,7 +36,8 @@ func TestDynconfigGetResolveSchedulerAddrs_LocalSourceType(t *testing.T) { tests := []struct { name string config *DaemonOption - expect func(t *testing.T, dynconfig Dynconfig, config *DaemonOption) + newcfg *DaemonOption + expect func(t *testing.T, dynconfig Dynconfig, config, newcfg *DaemonOption) }{ { name: "get scheduler addrs", @@ -49,7 +50,8 @@ func TestDynconfigGetResolveSchedulerAddrs_LocalSourceType(t *testing.T) { }, }, }, - expect: func(t *testing.T, dynconfig Dynconfig, config *DaemonOption) { + newcfg: nil, + expect: func(t *testing.T, dynconfig Dynconfig, config, _newcfg *DaemonOption) { assert := assert.New(t) result, err := dynconfig.GetResolveSchedulerAddrs() assert.NoError(err) @@ -67,7 +69,8 @@ func TestDynconfigGetResolveSchedulerAddrs_LocalSourceType(t *testing.T) { }, }, }, - expect: func(t *testing.T, dynconfig Dynconfig, config *DaemonOption) { + newcfg: nil, + expect: func(t *testing.T, dynconfig Dynconfig, config, _newcfg *DaemonOption) { assert := assert.New(t) _, err := dynconfig.GetResolveSchedulerAddrs() assert.EqualError(err, "can not found available scheduler addresses") @@ -87,13 +90,45 @@ func TestDynconfigGetResolveSchedulerAddrs_LocalSourceType(t *testing.T) { }, }, }, - expect: func(t *testing.T, dynconfig Dynconfig, config *DaemonOption) { + newcfg: nil, + expect: func(t *testing.T, dynconfig Dynconfig, config, _newcfg *DaemonOption) { assert := assert.New(t) result, err := dynconfig.GetResolveSchedulerAddrs() assert.NoError(err) assert.EqualValues(result, []resolver.Address{{ServerName: "127.0.0.1", Addr: "127.0.0.1:3000"}}) }, }, + { + name: "get scheduler addrs after update config", + config: &DaemonOption{ + Scheduler: SchedulerOption{ + NetAddrs: []dfnet.NetAddr{ + { + Addr: "127.0.0.1:3003", + }, + }, + }, + }, + newcfg: &DaemonOption{ + Scheduler: SchedulerOption{ + NetAddrs: []dfnet.NetAddr{ + { + Addr: "127.0.0.1:3000", + }, + }, + }, + }, + expect: func(t *testing.T, dynconfig Dynconfig, config, newcfg *DaemonOption) { + assert := assert.New(t) + _, err := dynconfig.GetResolveSchedulerAddrs() + assert.EqualError(err, "can not found available scheduler addresses") + + dynconfig.SetConfig(newcfg) + result, err := dynconfig.GetResolveSchedulerAddrs() + assert.NoError(err) + assert.EqualValues(result, []resolver.Address{{ServerName: "127.0.0.1", Addr: "127.0.0.1:3000"}}) + }, + }, } for _, tc := range tests { @@ -103,7 +138,7 @@ func TestDynconfigGetResolveSchedulerAddrs_LocalSourceType(t *testing.T) { t.Fatal(err) } - tc.expect(t, dynconfig, tc.config) + tc.expect(t, dynconfig, tc.config, tc.newcfg) }) } } diff --git a/client/config/dynconfig_manager.go b/client/config/dynconfig_manager.go index 9ffc03051..d5d803cfc 100644 --- a/client/config/dynconfig_manager.go +++ b/client/config/dynconfig_manager.go @@ -167,6 +167,11 @@ func (d *dynconfigManager) GetObjectStorage() (*managerv1.ObjectStorage, error) return data.ObjectStorage, nil } +// Get the dynamic config source type. +func (d *dynconfigManager) GetSourceType() SourceType { + return ManagerSourceType +} + // Refresh refreshes dynconfig in cache. func (d *dynconfigManager) Refresh() error { if err := d.Dynconfig.Refresh(); err != nil { @@ -180,6 +185,11 @@ func (d *dynconfigManager) Refresh() error { return nil } +// SetConfig updates DaemonOption in dynconfig. This is only useful for local dynconfig. +func (d *dynconfigManager) SetConfig(_cfg *DaemonOption) { + return +} + // Register allows an instance to register itself to listen/observe events. func (d *dynconfigManager) Register(l Observer) { d.observers[l] = struct{}{} diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index dbdac935e..6eff75adf 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -761,6 +761,10 @@ func (cd *clientDaemon) Serve() error { }() } + if cd.dynconfig.GetSourceType() == config.LocalSourceType { + watchers = append(watchers, cd.dynconfig.SetConfig) + } + if len(watchers) > 0 && interval > 0 { go func() { dependency.WatchConfig(interval, func() any {