From ad553d949b08508135afe9704531418eaa36b8dc Mon Sep 17 00:00:00 2001 From: Gaius Date: Thu, 25 Aug 2022 21:16:29 +0800 Subject: [PATCH] feat: grpc dial adds context (#1594) Signed-off-by: Gaius --- client/daemon/daemon.go | 4 ++-- client/daemon/peer/peertask_piecetask_poller.go | 2 +- client/daemon/peer/peertask_piecetask_synchronizer.go | 2 +- client/daemon/rpcserver/rpcserver_test.go | 2 +- client/daemon/rpcserver/seeder_test.go | 2 +- cmd/dfcache/cmd/root.go | 2 +- cmd/dfget/cmd/daemon.go | 2 +- cmd/dfget/cmd/root.go | 2 +- pkg/rpc/cdnsystem/client/client.go | 10 ++++++---- pkg/rpc/dfdaemon/client/client.go | 5 +++-- pkg/rpc/manager/client/client.go | 9 +++++---- pkg/rpc/scheduler/client/client.go | 5 +++-- scheduler/resource/seed_peer_client.go | 3 ++- scheduler/scheduler.go | 2 +- 14 files changed, 29 insertions(+), 23 deletions(-) diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 0e59e3380..3dbc2d7a5 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -118,7 +118,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { if opt.Scheduler.Manager.Enable { var err error - managerClient, err = managerclient.GetClientByAddr(opt.Scheduler.Manager.NetAddrs) + managerClient, err = managerclient.GetClientByAddr(context.Background(), opt.Scheduler.Manager.NetAddrs) if err != nil { return nil, err } @@ -142,7 +142,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } } - sched, err := schedulerclient.GetClient(dynconfig) + sched, err := schedulerclient.GetClient(context.Background(), dynconfig) if err != nil { return nil, fmt.Errorf("failed to get schedulers: %w", err) } diff --git a/client/daemon/peer/peertask_piecetask_poller.go b/client/daemon/peer/peertask_piecetask_poller.go index 430325e86..4aaafc100 100644 --- a/client/daemon/peer/peertask_piecetask_poller.go +++ b/client/daemon/peer/peertask_piecetask_poller.go @@ -171,7 +171,7 @@ func (poller *pieceTaskPoller) getPieceTasksByPeer( Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", peer.Ip, peer.RpcPort), } - client, err := dfdaemonclient.GetClient(netAddr.String()) + client, err := dfdaemonclient.GetClient(context.Background(), netAddr.String()) if err != nil { ptc.Errorf("get dfdaemon client error: %s", err) span.RecordError(err) diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index d9d155cbc..c97cc1aa8 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -166,7 +166,7 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( Type: dfnet.TCP, Addr: fmt.Sprintf("%s:%d", dstPeer.Ip, dstPeer.RpcPort), } - client, err := dfdaemonclient.GetClient(netAddr.String()) + client, err := dfdaemonclient.GetClient(context.Background(), netAddr.String()) if err != nil { s.peerTaskConductor.Errorf("get dfdaemon client error: %s, dest peer: %s", err, dstPeer.PeerId) return err diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index 98bcb9192..b4745c5f3 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -566,7 +566,7 @@ func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.A Type: dfnet.TCP, Addr: fmt.Sprintf(":%d", port), } - client, err := dfdaemonclient.GetClient(netAddr.String()) + client, err := dfdaemonclient.GetClient(context.Background(), netAddr.String()) assert.Nil(err, "grpc dial should be ok") return client } diff --git a/client/daemon/rpcserver/seeder_test.go b/client/daemon/rpcserver/seeder_test.go index f8f161bbe..b106afa80 100644 --- a/client/daemon/rpcserver/seeder_test.go +++ b/client/daemon/rpcserver/seeder_test.go @@ -379,7 +379,7 @@ func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *t } }() - client, err := client.GetClientByAddr(dfnet.NetAddr{ + client, err := client.GetClientByAddr(context.Background(), dfnet.NetAddr{ Type: dfnet.TCP, Addr: fmt.Sprintf(":%d", port), }) diff --git a/cmd/dfcache/cmd/root.go b/cmd/dfcache/cmd/root.go index e89c9d8a5..c93552803 100644 --- a/cmd/dfcache/cmd/root.go +++ b/cmd/dfcache/cmd/root.go @@ -173,7 +173,7 @@ func runDfcacheSubcmd(cmdName string, args []string) error { // checkDaemon checks if daemon is running func checkDaemon(daemonSockPath string) (client.Client, error) { netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} - dfdaemonClient, err := client.GetClient(netAddr.String()) + dfdaemonClient, err := client.GetClient(context.Background(), netAddr.String()) if err != nil { return nil, err } diff --git a/cmd/dfget/cmd/daemon.go b/cmd/dfget/cmd/daemon.go index 70b78e496..62db299a1 100644 --- a/cmd/dfget/cmd/daemon.go +++ b/cmd/dfget/cmd/daemon.go @@ -130,7 +130,7 @@ func initDaemonDfpath(cfg *config.DaemonOption) (dfpath.Dfpath, error) { func runDaemon(d dfpath.Dfpath) error { logger.Infof("Version:\n%s", version.Version()) netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: d.DaemonSockPath()} - daemonClient, err := client.GetClient(netAddr.String()) + daemonClient, err := client.GetClient(context.Background(), netAddr.String()) if err != nil { return err } diff --git a/cmd/dfget/cmd/root.go b/cmd/dfget/cmd/root.go index 9257bdc2f..62deb8dae 100644 --- a/cmd/dfget/cmd/root.go +++ b/cmd/dfget/cmd/root.go @@ -245,7 +245,7 @@ func runDfget(dfgetLockPath, daemonSockPath string) error { // checkAndSpawnDaemon do checking at three checkpoints func checkAndSpawnDaemon(dfgetLockPath, daemonSockPath string) (client.Client, error) { netAddr := &dfnet.NetAddr{Type: dfnet.UNIX, Addr: daemonSockPath} - dfdaemonClient, err := client.GetClient(netAddr.String()) + dfdaemonClient, err := client.GetClient(context.Background(), netAddr.String()) if err != nil { return nil, err } diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index 2b9f29ef5..f507523af 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -54,8 +54,9 @@ const ( perRetryTimeout = 3 * time.Second ) -func GetClientByAddr(netAddr dfnet.NetAddr, opts ...grpc.DialOption) (Client, error) { - conn, err := grpc.Dial( +func GetClientByAddr(ctx context.Context, netAddr dfnet.NetAddr, opts ...grpc.DialOption) (Client, error) { + conn, err := grpc.DialContext( + ctx, netAddr.Addr, append([]grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -87,12 +88,13 @@ func GetClientByAddr(netAddr dfnet.NetAddr, opts ...grpc.DialOption) (Client, er }, nil } -func GetClient(dynconfig config.DynconfigInterface, opts ...grpc.DialOption) (Client, error) { +func GetClient(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grpc.DialOption) (Client, error) { // Register resolver and balancer. resolver.RegisterSeedPeer(dynconfig) balancer.Register(pkgbalancer.NewConsistentHashingBuilder()) - conn, err := grpc.Dial( + conn, err := grpc.DialContext( + ctx, resolver.SeedPeerVirtualTarget, append([]grpc.DialOption{ grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), diff --git a/pkg/rpc/dfdaemon/client/client.go b/pkg/rpc/dfdaemon/client/client.go index 88ce3e074..28d01393f 100644 --- a/pkg/rpc/dfdaemon/client/client.go +++ b/pkg/rpc/dfdaemon/client/client.go @@ -52,12 +52,13 @@ const ( ) // GetClient returns dfdaemon client. -func GetClient(target string, opts ...grpc.DialOption) (Client, error) { +func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Client, error) { if rpc.IsVsock(target) { opts = append(opts, grpc.WithContextDialer(rpc.VsockDialer)) } - conn, err := grpc.Dial( + conn, err := grpc.DialContext( + ctx, target, append([]grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), diff --git a/pkg/rpc/manager/client/client.go b/pkg/rpc/manager/client/client.go index 7a2e6df5e..f0ce04a45 100644 --- a/pkg/rpc/manager/client/client.go +++ b/pkg/rpc/manager/client/client.go @@ -53,8 +53,9 @@ const ( ) // GetClient returns manager client. -func GetClient(target string, opts ...grpc.DialOption) (Client, error) { - conn, err := grpc.Dial( +func GetClient(ctx context.Context, target string, opts ...grpc.DialOption) (Client, error) { + conn, err := grpc.DialContext( + ctx, target, append([]grpc.DialOption{ grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -85,12 +86,12 @@ func GetClient(target string, opts ...grpc.DialOption) (Client, error) { } // GetClientByAddr returns manager client with addresses. -func GetClientByAddr(netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (Client, error) { +func GetClientByAddr(ctx context.Context, netAddrs []dfnet.NetAddr, opts ...grpc.DialOption) (Client, error) { for _, netAddr := range netAddrs { ipReachable := reachable.New(&reachable.Config{Address: netAddr.Addr}) if err := ipReachable.Check(); err == nil { logger.Infof("use %s address for manager grpc client", netAddr.Addr) - return GetClient(netAddr.Addr, opts...) + return GetClient(ctx, netAddr.Addr, opts...) } logger.Warnf("%s manager address can not reachable", netAddr.Addr) } diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index f707a7cb9..acb0d2032 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -55,12 +55,13 @@ const ( ) // GetClient get scheduler clients using resolver and balancer, -func GetClient(dynconfig config.Dynconfig, opts ...grpc.DialOption) (Client, error) { +func GetClient(ctx context.Context, dynconfig config.Dynconfig, opts ...grpc.DialOption) (Client, error) { // Register resolver and balancer. resolver.RegisterScheduler(dynconfig) balancer.Register(pkgbalancer.NewConsistentHashingBuilder()) - conn, err := grpc.Dial( + conn, err := grpc.DialContext( + ctx, resolver.SchedulerVirtualTarget, append([]grpc.DialOption{ grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), diff --git a/scheduler/resource/seed_peer_client.go b/scheduler/resource/seed_peer_client.go index 0d7700a26..362c5cc57 100644 --- a/scheduler/resource/seed_peer_client.go +++ b/scheduler/resource/seed_peer_client.go @@ -19,6 +19,7 @@ package resource import ( + "context" "fmt" reflect "reflect" @@ -62,7 +63,7 @@ func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostMana logger.Infof("initialize seed peer addresses: %#v", seedPeersToNetAddrs(config.SeedPeers)) // Initialize seed peer grpc client. - client, err := client.GetClient(dynconfig, opts...) + client, err := client.GetClient(context.Background(), dynconfig, opts...) if err != nil { return nil, err } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 900f80063..bf906323c 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -77,7 +77,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err s := &Server{config: cfg} // Initialize manager client. - managerClient, err := managerclient.GetClient(cfg.Manager.Addr) + managerClient, err := managerclient.GetClient(ctx, cfg.Manager.Addr) if err != nil { return nil, err }