diff --git a/client/config/peerhost.go b/client/config/peerhost.go index 6202b522e..0b4b970e9 100644 --- a/client/config/peerhost.go +++ b/client/config/peerhost.go @@ -239,6 +239,7 @@ type DownloadOption struct { TotalRateLimit util.RateLimit `mapstructure:"totalRateLimit" yaml:"totalRateLimit"` PerPeerRateLimit util.RateLimit `mapstructure:"perPeerRateLimit" yaml:"perPeerRateLimit"` PieceDownloadTimeout time.Duration `mapstructure:"pieceDownloadTimeout" yaml:"pieceDownloadTimeout"` + GRPCDialTimeout time.Duration `mapstructure:"grpcDialTimeout" yaml:"grpcDialTimeout"` DownloadGRPC ListenOption `mapstructure:"downloadGRPC" yaml:"downloadGRPC"` PeerGRPC ListenOption `mapstructure:"peerGRPC" yaml:"peerGRPC"` CalculateDigest bool `mapstructure:"calculateDigest" yaml:"calculateDigest"` diff --git a/client/config/peerhost_darwin.go b/client/config/peerhost_darwin.go index ded23feec..90fd917ea 100644 --- a/client/config/peerhost_darwin.go +++ b/client/config/peerhost_darwin.go @@ -70,6 +70,7 @@ var peerHostConfig = func() *DaemonOption { DefaultPattern: PatternP2P, CalculateDigest: true, PieceDownloadTimeout: 30 * time.Second, + GRPCDialTimeout: 10 * time.Second, GetPiecesMaxRetry: 100, TotalRateLimit: util.RateLimit{ Limit: rate.Limit(DefaultTotalDownloadLimit), diff --git a/client/config/peerhost_linux.go b/client/config/peerhost_linux.go index 6d7fb8174..8ee1bcf4e 100644 --- a/client/config/peerhost_linux.go +++ b/client/config/peerhost_linux.go @@ -70,6 +70,7 @@ var peerHostConfig = func() *DaemonOption { DefaultPattern: PatternP2P, CalculateDigest: true, PieceDownloadTimeout: 30 * time.Second, + GRPCDialTimeout: 10 * time.Second, GetPiecesMaxRetry: 100, TotalRateLimit: util.RateLimit{ Limit: rate.Limit(DefaultTotalDownloadLimit), diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index 6d07b603b..327e177d9 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -220,7 +220,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } peerTaskManager, err := peer.NewPeerTaskManager(host, pieceManager, storageManager, sched, opt.Scheduler, opt.Download.PerPeerRateLimit.Limit, opt.Storage.Multiplex, opt.Download.Prefetch, opt.Download.CalculateDigest, - opt.Download.GetPiecesMaxRetry, opt.Download.WatchdogTimeout, credentials) + opt.Download.GetPiecesMaxRetry, opt.Download.WatchdogTimeout, credentials, opt.Download.GRPCDialTimeout) if err != nil { return nil, err } diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index aae5366fe..b02f715c3 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -31,7 +31,6 @@ import ( "go.uber.org/atomic" "golang.org/x/time/rate" "google.golang.org/grpc/codes" - "google.golang.org/grpc/credentials" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -170,8 +169,6 @@ type peerTaskConductor struct { rg *util.Range sourceErrorStatus *status.Status - - grpcCredentials credentials.TransportCredentials } func (ptm *peerTaskManager) newPeerTaskConductor( @@ -244,7 +241,6 @@ func (ptm *peerTaskManager) newPeerTaskConductor( usedTraffic: atomic.NewUint64(0), SugaredLoggerOnWith: log, seed: seed, - grpcCredentials: ptm.grpcCredentials, parent: parent, rg: rg, diff --git a/client/daemon/peer/peertask_manager.go b/client/daemon/peer/peertask_manager.go index 553b9473a..09469fe6c 100644 --- a/client/daemon/peer/peertask_manager.go +++ b/client/daemon/peer/peertask_manager.go @@ -144,6 +144,7 @@ type peerTaskManager struct { getPiecesMaxRetry int grpcCredentials credentials.TransportCredentials + grpcDialTimeout time.Duration } func NewPeerTaskManager( @@ -158,7 +159,8 @@ func NewPeerTaskManager( calculateDigest bool, getPiecesMaxRetry int, watchdog time.Duration, - grpcCredentials credentials.TransportCredentials) (TaskManager, error) { + grpcCredentials credentials.TransportCredentials, + grpcDialTimeout time.Duration) (TaskManager, error) { ptm := &peerTaskManager{ host: host, @@ -175,6 +177,7 @@ func NewPeerTaskManager( calculateDigest: calculateDigest, getPiecesMaxRetry: getPiecesMaxRetry, grpcCredentials: grpcCredentials, + grpcDialTimeout: grpcDialTimeout, } return ptm, nil } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 8695bfbf7..fb64e31c4 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -40,6 +40,7 @@ import ( "golang.org/x/time/rate" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -313,6 +314,8 @@ func setupMockManager(ctrl *gomock.Controller, ts *testSpec, opt componentsOptio schedulerOption: config.SchedulerOption{ ScheduleTimeout: scheduleTimeout, }, + grpcDialTimeout: time.Second, + grpcCredentials: insecure.NewCredentials(), } return &mockManager{ testSpec: ts, diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 0d83de736..934deac9b 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -170,13 +170,15 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( } var credentialOpt grpc.DialOption - if s.peerTaskConductor.grpcCredentials != nil { - credentialOpt = grpc.WithTransportCredentials(s.peerTaskConductor.grpcCredentials) + if s.peerTaskConductor.peerTaskManager.grpcCredentials != nil { + credentialOpt = grpc.WithTransportCredentials(s.peerTaskConductor.peerTaskManager.grpcCredentials) } else { credentialOpt = grpc.WithTransportCredentials(insecure.NewCredentials()) } - client, err := dfdaemonclient.GetClient(context.Background(), netAddr.String(), credentialOpt) + dialCtx, cancel := context.WithTimeout(ctx, s.peerTaskConductor.peerTaskManager.grpcDialTimeout) + client, err := dfdaemonclient.GetClient(dialCtx, netAddr.String(), credentialOpt) + cancel() if err != nil { s.peerTaskConductor.Errorf("get dfdaemon client error: %s, dest peer: %s", err, dstPeer.PeerId) diff --git a/client/daemon/peer/peertask_reuse_test.go b/client/daemon/peer/peertask_reuse_test.go index 2c52a5f26..a57357e92 100644 --- a/client/daemon/peer/peertask_reuse_test.go +++ b/client/daemon/peer/peertask_reuse_test.go @@ -24,10 +24,12 @@ import ( "os" "path" "testing" + "time" "github.com/go-http-utils/headers" "github.com/golang/mock/gomock" testifyassert "github.com/stretchr/testify/assert" + "google.golang.org/grpc/credentials/insecure" commonv1 "d7y.io/api/pkg/apis/common/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" @@ -318,9 +320,11 @@ func TestReuseFilePeerTask(t *testing.T) { sm := mocks.NewMockManager(ctrl) tc.storageManager(sm) ptm := &peerTaskManager{ - host: &schedulerv1.PeerHost{}, - enablePrefetch: tc.enablePrefetch, - storageManager: sm, + host: &schedulerv1.PeerHost{}, + enablePrefetch: tc.enablePrefetch, + storageManager: sm, + grpcDialTimeout: time.Second, + grpcCredentials: insecure.NewCredentials(), } tc.verify(ptm.tryReuseFilePeerTask(context.Background(), tc.request)) }) @@ -695,9 +699,11 @@ func TestReuseStreamPeerTask(t *testing.T) { sm := mocks.NewMockManager(ctrl) tc.storageManager(sm) ptm := &peerTaskManager{ - host: &schedulerv1.PeerHost{}, - enablePrefetch: tc.enablePrefetch, - storageManager: sm, + host: &schedulerv1.PeerHost{}, + enablePrefetch: tc.enablePrefetch, + storageManager: sm, + grpcDialTimeout: time.Second, + grpcCredentials: insecure.NewCredentials(), } tc.verify(ptm.tryReuseStreamPeerTask(context.Background(), tc.request)) }) diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index a9fad828d..a6aff1d6e 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -36,6 +36,7 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -262,6 +263,8 @@ func TestStreamPeerTask_BackSource_Partial_WithContentLength(t *testing.T) { schedulerOption: config.SchedulerOption{ ScheduleTimeout: util.Duration{Duration: 10 * time.Minute}, }, + grpcDialTimeout: time.Second, + grpcCredentials: insecure.NewCredentials(), } req := &schedulerv1.PeerTaskRequest{ Url: url,