feat: support mutli manager addrs (#846)

* feat: support mutli manager addrs (#844)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2021-12-01 16:09:13 +08:00
parent cd0811a964
commit 999097e4ed
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
10 changed files with 45 additions and 21 deletions

View File

@ -75,10 +75,10 @@ type dynconfig struct {
done chan bool done chan bool
} }
func NewDynconfig(managerClient internaldynconfig.ManagerClient, expire time.Duration) (Dynconfig, error) { func NewDynconfig(rawManagerClient managerclient.Client, hostOption HostOption, expire time.Duration) (Dynconfig, error) {
client, err := internaldynconfig.New( client, err := internaldynconfig.New(
internaldynconfig.ManagerSourceType, internaldynconfig.ManagerSourceType,
internaldynconfig.WithManagerClient(managerClient), internaldynconfig.WithManagerClient(newManagerClient(rawManagerClient, hostOption)),
internaldynconfig.WithExpireTime(expire), internaldynconfig.WithExpireTime(expire),
internaldynconfig.WithCachePath(cachePath), internaldynconfig.WithCachePath(cachePath),
) )
@ -172,7 +172,7 @@ type managerClient struct {
} }
// New the manager client used by dynconfig // New the manager client used by dynconfig
func NewManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient { func newManagerClient(client managerclient.Client, hostOption HostOption) internaldynconfig.ManagerClient {
return &managerClient{ return &managerClient{
Client: client, Client: client,
hostOption: hostOption, hostOption: hostOption,

View File

@ -109,7 +109,7 @@ func TestDynconfigNewDynconfig(t *testing.T) {
mockManagerClient := mocks.NewMockClient(ctl) mockManagerClient := mocks.NewMockClient(ctl)
tc.mock(mockManagerClient.EXPECT()) tc.mock(mockManagerClient.EXPECT())
_, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire) _, err := NewDynconfig(mockManagerClient, tc.hostOption, tc.expire)
tc.expect(t, err) tc.expect(t, err)
tc.cleanFileCache(t) tc.cleanFileCache(t)
}) })
@ -283,7 +283,7 @@ func TestDynconfigGet(t *testing.T) {
mockManagerClient := mocks.NewMockClient(ctl) mockManagerClient := mocks.NewMockClient(ctl)
tc.mock(mockManagerClient.EXPECT(), tc.data) tc.mock(mockManagerClient.EXPECT(), tc.data)
dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire) dynconfig, err := NewDynconfig(mockManagerClient, tc.hostOption, tc.expire)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -462,7 +462,7 @@ func TestDynconfigGetSchedulers(t *testing.T) {
mockManagerClient := mocks.NewMockClient(ctl) mockManagerClient := mocks.NewMockClient(ctl)
tc.mock(mockManagerClient.EXPECT(), tc.data) tc.mock(mockManagerClient.EXPECT(), tc.data)
dynconfig, err := NewDynconfig(NewManagerClient(mockManagerClient, tc.hostOption), tc.expire) dynconfig, err := NewDynconfig(mockManagerClient, tc.hostOption, tc.expire)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }

View File

@ -114,7 +114,7 @@ func (p *DaemonOption) Validate() error {
} }
if p.Scheduler.Manager.Enable { if p.Scheduler.Manager.Enable {
if p.Scheduler.Manager.Addr == "" { if len(p.Scheduler.NetAddrs) == 0 {
return errors.New("manager addr is not specified") return errors.New("manager addr is not specified")
} }
@ -140,8 +140,8 @@ type SchedulerOption struct {
type ManagerOption struct { type ManagerOption struct {
// Enable get configuration from manager // Enable get configuration from manager
Enable bool `mapstructure:"enable" yaml:"enable"` Enable bool `mapstructure:"enable" yaml:"enable"`
// Addr is manager addresse // NetAddrs is manager addresses.
Addr string `mapstructure:"addr" yaml:"addr"` NetAddrs []dfnet.NetAddr `mapstructure:"netAddrs" yaml:"netAddrs"`
// RefreshInterval is the refresh interval // RefreshInterval is the refresh interval
RefreshInterval time.Duration `mapstructure:"refreshInterval" yaml:"refreshInterval"` RefreshInterval time.Duration `mapstructure:"refreshInterval" yaml:"refreshInterval"`
} }

View File

@ -229,8 +229,13 @@ func TestPeerHostOption_Load(t *testing.T) {
KeepStorage: false, KeepStorage: false,
Scheduler: SchedulerOption{ Scheduler: SchedulerOption{
Manager: ManagerOption{ Manager: ManagerOption{
Enable: false, Enable: false,
Addr: "127.0.0.1:65003", NetAddrs: []dfnet.NetAddr{
{
Type: dfnet.TCP,
Addr: "127.0.0.1:65003",
},
},
RefreshInterval: 5 * time.Minute, RefreshInterval: 5 * time.Minute,
}, },
NetAddrs: []dfnet.NetAddr{ NetAddrs: []dfnet.NetAddr{

View File

@ -6,7 +6,9 @@ keepStorage: false
scheduler: scheduler:
manager: manager:
enable: false enable: false
addr: "127.0.0.1:65003" netAddrs:
- type: tcp
addr: 127.0.0.1:65003
refreshInterval: 5m refreshInterval: 5m
netAddrs: netAddrs:
- type: tcp - type: tcp

View File

@ -107,16 +107,13 @@ func New(opt *config.DaemonOption) (Daemon, error) {
var dynconfig config.Dynconfig var dynconfig config.Dynconfig
if opt.Scheduler.Manager.Enable == true { if opt.Scheduler.Manager.Enable == true {
// New manager client // New manager client
managerClient, err := managerclient.New(opt.Scheduler.Manager.Addr) managerClient, err := managerclient.NewWithAddrs(opt.Scheduler.Manager.NetAddrs)
if err != nil { if err != nil {
return nil, err return nil, err
} }
// New dynconfig client // New dynconfig client
if dynconfig, err = config.NewDynconfig( if dynconfig, err = config.NewDynconfig(managerClient, opt.Host, opt.Scheduler.Manager.RefreshInterval); err != nil {
config.NewManagerClient(managerClient, opt.Host),
opt.Scheduler.Manager.RefreshInterval,
); err != nil {
return nil, err return nil, err
} }

@ -1 +1 @@
Subproject commit 7245a3ca839403e6f5fd61279e7f54b0cce59f60 Subproject commit 88270e37feec344381e63a26275dceb8296c12ac

View File

@ -28,8 +28,10 @@ scheduler:
manager: manager:
# get scheduler list dynamically from manager # get scheduler list dynamically from manager
enable: false enable: false
# manager service address # manager service addresses
addr: 127.0.0.1:65003 netAddrs:
- type: tcp
addr: 127.0.0.1:65003
# scheduler list refresh interval # scheduler list refresh interval
refreshInterval: 5m refreshInterval: 5m
# schedule timeout # schedule timeout

View File

@ -28,7 +28,9 @@ scheduler:
# 通过 manager 接口动态获取 scheduler 列表 # 通过 manager 接口动态获取 scheduler 列表
enable: false enable: false
# manager 服务地址 # manager 服务地址
addr: 127.0.0.1:65003 netAddrs:
- type: tcp
addr: 127.0.0.1:65003
# scheduler 列表刷新时间 # scheduler 列表刷新时间
refreshInterval: 5m refreshInterval: 5m
# 调度超时 # 调度超时

View File

@ -18,6 +18,7 @@ package client
import ( import (
"context" "context"
"errors"
"time" "time"
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
@ -27,6 +28,8 @@ import (
"google.golang.org/grpc/backoff" "google.golang.org/grpc/backoff"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/basic/dfnet"
"d7y.io/dragonfly/v2/pkg/reachable"
"d7y.io/dragonfly/v2/pkg/rpc/manager" "d7y.io/dragonfly/v2/pkg/rpc/manager"
) )
@ -91,6 +94,19 @@ func New(target string) (Client, error) {
}, nil }, nil
} }
func NewWithAddrs(netAddrs []dfnet.NetAddr) (Client, error) {
for _, netAddr := range netAddrs {
ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr})
if err := ipReachable.Check(); err == nil {
logger.Infof("use %s address for manager grpc client", netAddr.Addr)
return New(netAddr.Addr)
}
logger.Warnf("%s address can not reachable", netAddr.Addr)
}
return nil, errors.New("can not find available addresses")
}
func (c *client) GetScheduler(req *manager.GetSchedulerRequest) (*manager.Scheduler, error) { func (c *client) GetScheduler(req *manager.GetSchedulerRequest) (*manager.Scheduler, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel() defer cancel()