diff --git a/go.mod b/go.mod index 79459b592..b200034bf 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.21 require ( - d7y.io/api/v2 v2.0.64 + d7y.io/api/v2 v2.0.67 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index 7e5d12029..b932f2cc5 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api/v2 v2.0.64 h1:2mdQ0maJZZgogfQHoCyzi1TBczyby1WeyFau13ywmDw= -d7y.io/api/v2 v2.0.64/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io= +d7y.io/api/v2 v2.0.67 h1:4fiGXT1WHWgRXSTmnP53MU83Zbf+7i1jYeGNEJWrM7Q= +d7y.io/api/v2 v2.0.67/go.mod h1:ve0R4ePgRYZVdnVyhWTOi2LdP3/gyf21ZwP2xij+3Io= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/pkg/rpc/dfdaemon/client/client_v2.go b/pkg/rpc/dfdaemon/client/client_v2.go index 308f6f271..a5cb0696a 100644 --- a/pkg/rpc/dfdaemon/client/client_v2.go +++ b/pkg/rpc/dfdaemon/client/client_v2.go @@ -26,19 +26,29 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" logger "d7y.io/dragonfly/v2/internal/dflog" + pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer" + "d7y.io/dragonfly/v2/pkg/resolver" "d7y.io/dragonfly/v2/pkg/rpc" + "d7y.io/dragonfly/v2/scheduler/config" ) // GetV2 returns v2 version of the dfdaemon client. -func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, error) { +func GetV2(ctx context.Context, dynconfig config.DynconfigInterface, opts ...grpc.DialOption) (V2, error) { + // Register resolver and balancer. + resolver.RegisterSeedPeer(dynconfig) + builder, pickerBuilder := pkgbalancer.NewConsistentHashingBuilder() + balancer.Register(builder) + conn, err := grpc.DialContext( ctx, - target, + resolver.SeedPeerVirtualTarget, append([]grpc.DialOption{ + grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( rpc.OTELUnaryClientInterceptor(), grpc_prometheus.UnaryClientInterceptor, @@ -47,11 +57,13 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err grpc_retry.WithMax(maxRetries), grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), ), + rpc.RefresherUnaryClientInterceptor(dynconfig), )), grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( rpc.OTELStreamClientInterceptor(), grpc_prometheus.StreamClientInterceptor, grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + rpc.RefresherStreamClientInterceptor(dynconfig), )), }, opts...)..., ) @@ -60,8 +72,9 @@ func GetV2(ctx context.Context, target string, opts ...grpc.DialOption) (V2, err } return &v2{ - DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn), - ClientConn: conn, + DfdaemonUploadClient: dfdaemonv2.NewDfdaemonUploadClient(conn), + ClientConn: conn, + ConsistentHashingPickerBuilder: pickerBuilder, }, nil } @@ -73,8 +86,8 @@ type V2 interface { // DownloadPiece downloads piece from the other peer. DownloadPiece(context.Context, *dfdaemonv2.DownloadPieceRequest, ...grpc.CallOption) (*dfdaemonv2.DownloadPieceResponse, error) - // DownloadTask downloads task from the other peer. - DownloadTask(context.Context, *dfdaemonv2.DownloadTaskRequest, ...grpc.CallOption) error + // TriggerDownloadTask triggers download task from the other peer. + TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest, ...grpc.CallOption) (*dfdaemonv2.TriggerDownloadTaskResponse, error) // Close tears down the ClientConn and all underlying connections. Close() error @@ -84,6 +97,7 @@ type V2 interface { type v2 struct { dfdaemonv2.DfdaemonUploadClient *grpc.ClientConn + *pkgbalancer.ConsistentHashingPickerBuilder } // SyncPieces syncs pieces from the other peers. @@ -110,16 +124,14 @@ func (v *v2) DownloadPiece(ctx context.Context, req *dfdaemonv2.DownloadPieceReq ) } -// DownloadTask downloads task from the other peer. -func (v *v2) DownloadTask(ctx context.Context, req *dfdaemonv2.DownloadTaskRequest, opts ...grpc.CallOption) error { +// TriggerDownloadTask triggers download task from the other peer. +func (v *v2) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest, opts ...grpc.CallOption) (*dfdaemonv2.TriggerDownloadTaskResponse, error) { ctx, cancel := context.WithTimeout(ctx, contextTimeout) defer cancel() - _, err := v.DfdaemonUploadClient.DownloadTask( + return v.DfdaemonUploadClient.TriggerDownloadTask( ctx, req, opts..., ) - - return err } diff --git a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go index 59d8ea231..90e496f50 100644 --- a/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go +++ b/pkg/rpc/dfdaemon/client/mocks/client_v2_mock.go @@ -74,25 +74,6 @@ func (mr *MockV2MockRecorder) DownloadPiece(arg0, arg1 any, arg2 ...any) *gomock return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockV2)(nil).DownloadPiece), varargs...) } -// DownloadTask mocks base method. -func (m *MockV2) DownloadTask(arg0 context.Context, arg1 *dfdaemon.DownloadTaskRequest, arg2 ...grpc.CallOption) error { - m.ctrl.T.Helper() - varargs := []any{arg0, arg1} - for _, a := range arg2 { - varargs = append(varargs, a) - } - ret := m.ctrl.Call(m, "DownloadTask", varargs...) - ret0, _ := ret[0].(error) - return ret0 -} - -// DownloadTask indicates an expected call of DownloadTask. -func (mr *MockV2MockRecorder) DownloadTask(arg0, arg1 any, arg2 ...any) *gomock.Call { - mr.mock.ctrl.T.Helper() - varargs := append([]any{arg0, arg1}, arg2...) - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockV2)(nil).DownloadTask), varargs...) -} - // SyncPieces mocks base method. func (m *MockV2) SyncPieces(arg0 context.Context, arg1 *dfdaemon.SyncPiecesRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_SyncPiecesClient, error) { m.ctrl.T.Helper() @@ -112,3 +93,23 @@ func (mr *MockV2MockRecorder) SyncPieces(arg0, arg1 any, arg2 ...any) *gomock.Ca varargs := append([]any{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockV2)(nil).SyncPieces), varargs...) } + +// TriggerDownloadTask mocks base method. +func (m *MockV2) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest, arg2 ...grpc.CallOption) (*dfdaemon.TriggerDownloadTaskResponse, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "TriggerDownloadTask", varargs...) + ret0, _ := ret[0].(*dfdaemon.TriggerDownloadTaskResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TriggerDownloadTask indicates an expected call of TriggerDownloadTask. +func (mr *MockV2MockRecorder) TriggerDownloadTask(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockV2)(nil).TriggerDownloadTask), varargs...) +} diff --git a/scheduler/resource/resource.go b/scheduler/resource/resource.go index 6984514d0..a772be995 100644 --- a/scheduler/resource/resource.go +++ b/scheduler/resource/resource.go @@ -120,7 +120,7 @@ func New(cfg *config.Config, gc gc.GC, dynconfig config.DynconfigInterface, opti return nil, err } - resource.seedPeer = newSeedPeer(&cfg.Resource, client, peerManager, hostManager) + resource.seedPeer = newSeedPeer(cfg, client, peerManager, hostManager) } return resource, nil diff --git a/scheduler/resource/resource_test.go b/scheduler/resource/resource_test.go index 489cfdff7..4ebb7986e 100644 --- a/scheduler/resource/resource_test.go +++ b/scheduler/resource/resource_test.go @@ -57,6 +57,8 @@ func TestResource_New(t *testing.T) { md.Register(gomock.Any()).Return().Times(1), md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), md.Register(gomock.Any()).Return().Times(1), + md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + md.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -132,6 +134,8 @@ func TestResource_New(t *testing.T) { md.Register(gomock.Any()).Return().Times(1), md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), md.Register(gomock.Any()).Return().Times(1), + md.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + md.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index dd827efef..1b25f4e96 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -24,9 +24,12 @@ import ( "strings" "time" + "go.opentelemetry.io/otel/trace" + cdnsystemv1 "d7y.io/api/v2/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/v2/pkg/apis/common/v1" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" + dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/pkg/digest" @@ -44,9 +47,9 @@ const ( // SeedPeer is the interface used for seed peer. type SeedPeer interface { - // DownloadTask downloads task back-to-source. + // TriggerDownloadTask triggers the seed peer to download task. // Used only in v2 version of the grpc. - DownloadTask(context.Context, *commonv2.Download) error + TriggerDownloadTask(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) (*dfdaemonv2.TriggerDownloadTaskResponse, error) // TriggerTask triggers the seed peer to download task. // Used only in v1 version of the grpc. @@ -62,7 +65,7 @@ type SeedPeer interface { // seedPeer contains content for seed peer. type seedPeer struct { // config is the config of resource. - config *config.ResourceConfig + config *config.Config // client is the dynamic client of seed peer. client SeedPeerClient @@ -75,7 +78,7 @@ type seedPeer struct { } // New SeedPeer interface. -func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager PeerManager, hostManager HostManager) SeedPeer { +func newSeedPeer(cfg *config.Config, client SeedPeerClient, peerManager PeerManager, hostManager HostManager) SeedPeer { return &seedPeer{ config: cfg, client: client, @@ -84,14 +87,13 @@ func newSeedPeer(cfg *config.ResourceConfig, client SeedPeerClient, peerManager } } -// TODO Implement DownloadTask -// DownloadTask downloads task back-to-source. +// TriggerDownloadTask triggers the seed peer to download task. // Used only in v2 version of the grpc. -func (s *seedPeer) DownloadTask(ctx context.Context, download *commonv2.Download) error { - // ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))) - // defer cancel() +func (s *seedPeer) TriggerDownloadTask(ctx context.Context, req *dfdaemonv2.TriggerDownloadTaskRequest) (*dfdaemonv2.TriggerDownloadTaskResponse, error) { + ctx, cancel := context.WithCancel(trace.ContextWithSpan(context.Background(), trace.SpanFromContext(ctx))) + defer cancel() - return nil + return s.client.TriggerDownloadTask(ctx, req) } // TriggerTask triggers the seed peer to download task. @@ -145,7 +147,7 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) initialized = true // Initialize seed peer. - peer, err = s.initSeedPeer(ctx, rg, task, pieceSeed) + peer, err = s.initSeedPeer(ctx, rg, task, pieceSeed.HostId, pieceSeed.PeerId) if err != nil { return nil, nil, err } @@ -210,21 +212,21 @@ func (s *seedPeer) TriggerTask(ctx context.Context, rg *http.Range, task *Task) } // Initialize seed peer. -func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task, ps *cdnsystemv1.PieceSeed) (*Peer, error) { +func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task, hostID string, peerID string) (*Peer, error) { // Load host from manager. - host, loaded := s.hostManager.Load(ps.HostId) + host, loaded := s.hostManager.Load(hostID) if !loaded { - task.Log.Errorf("can not find seed host id: %s", ps.HostId) - return nil, fmt.Errorf("can not find host id: %s", ps.HostId) + task.Log.Errorf("can not find seed host id: %s", hostID) + return nil, fmt.Errorf("can not find host id: %s", hostID) } host.UpdatedAt.Store(time.Now()) // Load peer from manager. - peer, loaded := s.peerManager.Load(ps.PeerId) + peer, loaded := s.peerManager.Load(peerID) if loaded { return peer, nil } - task.Log.Infof("can not find seed peer: %s", ps.PeerId) + task.Log.Infof("can not find seed peer: %s", peerID) options := []PeerOption{} if rg != nil { @@ -232,7 +234,7 @@ func (s *seedPeer) initSeedPeer(ctx context.Context, rg *http.Range, task *Task, } // New and store seed peer without range. - peer = NewPeer(ps.PeerId, s.config, task, host, options...) + peer = NewPeer(peerID, &s.config.Resource, task, host, options...) s.peerManager.Store(peer) peer.Log.Info("seed peer has been stored") diff --git a/scheduler/resource/seed_peer_client.go b/scheduler/resource/seed_peer_client.go index 89fcf5132..e5a9f6b1f 100644 --- a/scheduler/resource/seed_peer_client.go +++ b/scheduler/resource/seed_peer_client.go @@ -23,6 +23,7 @@ import ( "fmt" reflect "reflect" + "github.com/hashicorp/go-multierror" "google.golang.org/grpc" managerv2 "d7y.io/api/v2/pkg/apis/manager/v2" @@ -30,7 +31,8 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" + cdnsystemclient "d7y.io/dragonfly/v2/pkg/rpc/cdnsystem/client" + dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -40,8 +42,11 @@ type SeedPeerClient interface { // Addrs returns the addresses of seed peers. Addrs() []string - // client is seed peer grpc client interface. - client.Client + // Client is cdnsystem grpc client interface. + cdnsystemclient.Client + + // V2 is dfdaemon v2 grpc client interface. + dfdaemonclient.V2 // Observer is dynconfig observer interface. config.Observer @@ -49,8 +54,11 @@ type SeedPeerClient interface { // seedPeerClient contains content for client of seed peer. type seedPeerClient struct { - // client is sedd peer grpc client instance. - client.Client + // Client is cdnsystem grpc client interface. + cdnsystemclient.Client + + // V2 is dfdaemon v2 grpc client interface. + dfdaemonclient.V2 // hostManager is host manager. hostManager HostManager @@ -71,14 +79,25 @@ func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostMana logger.Infof("initialize seed peer addresses: %#v", seedPeersToNetAddrs(config.Scheduler.SeedPeers)) // Initialize seed peer grpc client. - client, err := client.GetClient(context.Background(), dynconfig, opts...) + cdnsystemClient, err := cdnsystemclient.GetClient(context.Background(), dynconfig, opts...) if err != nil { return nil, err } + fmt.Println("cdnsystemClient", cdnsystemClient) + + // Initialize dfdaemon v2 grpc client. + dfdaemonClient, err := dfdaemonclient.GetV2(context.Background(), dynconfig, opts...) + if err != nil { + return nil, err + } + + fmt.Println("dfdaemonClient", dfdaemonClient) + sc := &seedPeerClient{ hostManager: hostManager, - Client: client, + Client: cdnsystemClient, + V2: dfdaemonClient, dynconfig: dynconfig, data: config, } @@ -90,6 +109,20 @@ func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostMana return sc, nil } +// Close closes the seed peer client. +func (sc *seedPeerClient) Close() error { + var errs error + if err := sc.Client.Close(); err != nil { + errs = multierror.Append(errs, err) + } + + if err := sc.V2.Close(); err != nil { + errs = multierror.Append(errs, err) + } + + return errs +} + // Addrs returns the addresses of seed peers. func (sc *seedPeerClient) Addrs() []string { var addrs []string diff --git a/scheduler/resource/seed_peer_client_mock.go b/scheduler/resource/seed_peer_client_mock.go index 117036cea..8f8f27011 100644 --- a/scheduler/resource/seed_peer_client_mock.go +++ b/scheduler/resource/seed_peer_client_mock.go @@ -14,6 +14,7 @@ import ( cdnsystem "d7y.io/api/v2/pkg/apis/cdnsystem/v1" common "d7y.io/api/v2/pkg/apis/common/v1" + dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2" config "d7y.io/dragonfly/v2/scheduler/config" gomock "go.uber.org/mock/gomock" grpc "google.golang.org/grpc" @@ -70,6 +71,26 @@ func (mr *MockSeedPeerClientMockRecorder) Close() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockSeedPeerClient)(nil).Close)) } +// DownloadPiece mocks base method. +func (m *MockSeedPeerClient) DownloadPiece(arg0 context.Context, arg1 *dfdaemon.DownloadPieceRequest, arg2 ...grpc.CallOption) (*dfdaemon.DownloadPieceResponse, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "DownloadPiece", varargs...) + ret0, _ := ret[0].(*dfdaemon.DownloadPieceResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// DownloadPiece indicates an expected call of DownloadPiece. +func (mr *MockSeedPeerClientMockRecorder) DownloadPiece(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadPiece", reflect.TypeOf((*MockSeedPeerClient)(nil).DownloadPiece), varargs...) +} + // GetPieceTasks mocks base method. func (m *MockSeedPeerClient) GetPieceTasks(arg0 context.Context, arg1 *common.PieceTaskRequest, arg2 ...grpc.CallOption) (*common.PiecePacket, error) { m.ctrl.T.Helper() @@ -141,3 +162,43 @@ func (mr *MockSeedPeerClientMockRecorder) SyncPieceTasks(arg0, arg1 any, arg2 .. varargs := append([]any{arg0, arg1}, arg2...) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockSeedPeerClient)(nil).SyncPieceTasks), varargs...) } + +// SyncPieces mocks base method. +func (m *MockSeedPeerClient) SyncPieces(arg0 context.Context, arg1 *dfdaemon.SyncPiecesRequest, arg2 ...grpc.CallOption) (dfdaemon.DfdaemonUpload_SyncPiecesClient, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SyncPieces", varargs...) + ret0, _ := ret[0].(dfdaemon.DfdaemonUpload_SyncPiecesClient) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SyncPieces indicates an expected call of SyncPieces. +func (mr *MockSeedPeerClientMockRecorder) SyncPieces(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieces", reflect.TypeOf((*MockSeedPeerClient)(nil).SyncPieces), varargs...) +} + +// TriggerDownloadTask mocks base method. +func (m *MockSeedPeerClient) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest, arg2 ...grpc.CallOption) (*dfdaemon.TriggerDownloadTaskResponse, error) { + m.ctrl.T.Helper() + varargs := []any{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "TriggerDownloadTask", varargs...) + ret0, _ := ret[0].(*dfdaemon.TriggerDownloadTaskResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TriggerDownloadTask indicates an expected call of TriggerDownloadTask. +func (mr *MockSeedPeerClientMockRecorder) TriggerDownloadTask(arg0, arg1 any, arg2 ...any) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]any{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockSeedPeerClient)(nil).TriggerDownloadTask), varargs...) +} diff --git a/scheduler/resource/seed_peer_client_test.go b/scheduler/resource/seed_peer_client_test.go index 090bd65d4..234b9a0ca 100644 --- a/scheduler/resource/seed_peer_client_test.go +++ b/scheduler/resource/seed_peer_client_test.go @@ -52,6 +52,8 @@ func TestSeedPeerClient_newSeedPeerClient(t *testing.T) { }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Load(gomock.Any()).Return(nil, false).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), @@ -84,6 +86,8 @@ func TestSeedPeerClient_newSeedPeerClient(t *testing.T) { dynconfig.Register(gomock.Any()).Return().Times(1), dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, err error) { @@ -183,6 +187,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Load(gomock.Any()).Return(nil, false).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), @@ -220,6 +226,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Load(gomock.Any()).Return(nil, false).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), @@ -255,6 +263,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Load(gomock.Any()).Return(nil, false).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), @@ -289,6 +299,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Load(gomock.Any()).Return(nil, false).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), diff --git a/scheduler/resource/seed_peer_mock.go b/scheduler/resource/seed_peer_mock.go index 07f4d3725..695cbf9b5 100644 --- a/scheduler/resource/seed_peer_mock.go +++ b/scheduler/resource/seed_peer_mock.go @@ -12,7 +12,7 @@ import ( context "context" reflect "reflect" - common "d7y.io/api/v2/pkg/apis/common/v2" + dfdaemon "d7y.io/api/v2/pkg/apis/dfdaemon/v2" scheduler "d7y.io/api/v2/pkg/apis/scheduler/v1" http "d7y.io/dragonfly/v2/pkg/net/http" gomock "go.uber.org/mock/gomock" @@ -55,20 +55,6 @@ func (mr *MockSeedPeerMockRecorder) Client() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Client", reflect.TypeOf((*MockSeedPeer)(nil).Client)) } -// DownloadTask mocks base method. -func (m *MockSeedPeer) DownloadTask(arg0 context.Context, arg1 *common.Download) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DownloadTask", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// DownloadTask indicates an expected call of DownloadTask. -func (mr *MockSeedPeerMockRecorder) DownloadTask(arg0, arg1 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).DownloadTask), arg0, arg1) -} - // Stop mocks base method. func (m *MockSeedPeer) Stop() error { m.ctrl.T.Helper() @@ -83,6 +69,21 @@ func (mr *MockSeedPeerMockRecorder) Stop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockSeedPeer)(nil).Stop)) } +// TriggerDownloadTask mocks base method. +func (m *MockSeedPeer) TriggerDownloadTask(arg0 context.Context, arg1 *dfdaemon.TriggerDownloadTaskRequest) (*dfdaemon.TriggerDownloadTaskResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "TriggerDownloadTask", arg0, arg1) + ret0, _ := ret[0].(*dfdaemon.TriggerDownloadTaskResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// TriggerDownloadTask indicates an expected call of TriggerDownloadTask. +func (mr *MockSeedPeerMockRecorder) TriggerDownloadTask(arg0, arg1 any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "TriggerDownloadTask", reflect.TypeOf((*MockSeedPeer)(nil).TriggerDownloadTask), arg0, arg1) +} + // TriggerTask mocks base method. func (m *MockSeedPeer) TriggerTask(arg0 context.Context, arg1 *http.Range, arg2 *Task) (*Peer, *scheduler.PeerResult, error) { m.ctrl.T.Helper() diff --git a/scheduler/resource/seed_peer_test.go b/scheduler/resource/seed_peer_test.go index 3798eaa6c..358f7b9cb 100644 --- a/scheduler/resource/seed_peer_test.go +++ b/scheduler/resource/seed_peer_test.go @@ -26,6 +26,7 @@ import ( gomock "go.uber.org/mock/gomock" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" + dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" schedulerv1 "d7y.io/api/v2/pkg/apis/scheduler/v1" ) @@ -51,7 +52,54 @@ func TestSeedPeer_newSeedPeer(t *testing.T) { peerManager := NewMockPeerManager(ctl) client := NewMockSeedPeerClient(ctl) - tc.expect(t, newSeedPeer(mockResourceConfig, client, peerManager, hostManager)) + tc.expect(t, newSeedPeer(mockConfig, client, peerManager, hostManager)) + }) + } +} + +func TestSeedPeer_TriggerDownloadTask(t *testing.T) { + tests := []struct { + name string + mock func(mc *MockSeedPeerClientMockRecorder) + expect func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) + }{ + { + name: "trigger download task failed", + mock: func(mc *MockSeedPeerClientMockRecorder) { + mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(nil, errors.New("foo")).Times(1) + }, + expect: func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) { + assert := assert.New(t) + assert.EqualError(err, "foo") + }, + }, + { + name: "trigger download task scuccess", + mock: func(mc *MockSeedPeerClientMockRecorder) { + mc.TriggerDownloadTask(gomock.Any(), gomock.Any()).Return(&dfdaemonv2.TriggerDownloadTaskResponse{HostId: mockHostID, TaskId: mockTaskID, PeerId: mockPeerID}, nil).Times(1) + }, + expect: func(t *testing.T, resp *dfdaemonv2.TriggerDownloadTaskResponse, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(mockHostID, resp.HostId) + assert.Equal(mockTaskID, resp.TaskId) + assert.Equal(mockPeerID, resp.PeerId) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + hostManager := NewMockHostManager(ctl) + peerManager := NewMockPeerManager(ctl) + client := NewMockSeedPeerClient(ctl) + tc.mock(client.EXPECT()) + + seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager) + resp, err := seedPeer.TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{}) + tc.expect(t, resp, err) }) } } @@ -83,7 +131,7 @@ func TestSeedPeer_TriggerTask(t *testing.T) { client := NewMockSeedPeerClient(ctl) tc.mock(client.EXPECT()) - seedPeer := newSeedPeer(mockResourceConfig, client, peerManager, hostManager) + seedPeer := newSeedPeer(mockConfig, client, peerManager, hostManager) mockTask := NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, WithDigest(mockTaskDigest)) peer, result, err := seedPeer.TriggerTask(context.Background(), nil, mockTask) tc.expect(t, peer, result, err) diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index dfdc6bf83..ec0862fe8 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -59,7 +59,11 @@ var ( mockTaskFilters = []string{"bar"} mockTaskHeader = map[string]string{"content-length": "100"} mockTaskPieceLength int32 = 2048 - mockResourceConfig = &config.ResourceConfig{ + mockConfig = &config.Config{ + Resource: *mockResourceConfig, + } + + mockResourceConfig = &config.ResourceConfig{ Task: config.TaskConfig{ DownloadTiny: config.DownloadTinyConfig{ Scheme: config.DefaultResourceTaskDownloadTinyScheme, diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 6727e0be4..75e092d2f 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -28,6 +28,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" + dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -1311,12 +1312,16 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down case commonv2.Priority_LEVEL6, commonv2.Priority_LEVEL0: // Super peer is first triggered to download back-to-source. if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { - go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { - if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil { - peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) { + resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}) + if err != nil { + peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error()) return } - }(ctx, peer, types.HostTypeSuperSeed) + + peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId()) + }(ctx, download, types.HostTypeSuperSeed) + break } @@ -1324,12 +1329,16 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down case commonv2.Priority_LEVEL5: // Strong peer is first triggered to download back-to-source. if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { - go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { - if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil { - peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) { + resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}) + if err != nil { + peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error()) return } - }(ctx, peer, types.HostTypeStrongSeed) + + peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId()) + }(ctx, download, types.HostTypeSuperSeed) + break } @@ -1337,12 +1346,16 @@ func (v *V2) downloadTaskBySeedPeer(ctx context.Context, download *commonv2.Down case commonv2.Priority_LEVEL4: // Weak peer is first triggered to download back-to-source. if v.config.SeedPeer.Enable && !peer.Task.IsSeedPeerFailed() { - go func(ctx context.Context, peer *resource.Peer, hostType types.HostType) { - if err := v.resource.SeedPeer().DownloadTask(context.Background(), download); err != nil { - peer.Log.Errorf("%s seed peer downloads task failed %s", hostType.Name(), err.Error()) + go func(ctx context.Context, download *commonv2.Download, hostType types.HostType) { + resp, err := v.resource.SeedPeer().TriggerDownloadTask(context.Background(), &dfdaemonv2.TriggerDownloadTaskRequest{Download: download}) + if err != nil { + peer.Log.Errorf("%s seed peer triggers download task failed %s", hostType.Name(), err.Error()) return } - }(ctx, peer, types.HostTypeWeakSeed) + + peer.Log.Infof("%s seed peer triggers download task success, hostID: %s, peerID: %s", hostType.Name(), resp.GetHostId(), resp.GetPeerId()) + }(ctx, download, types.HostTypeSuperSeed) + break } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 9dc224526..d3da9e3a6 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -37,6 +37,7 @@ import ( "google.golang.org/protobuf/types/known/timestamppb" commonv2 "d7y.io/api/v2/pkg/apis/common/v2" + dfdaemonv2 "d7y.io/api/v2/pkg/apis/dfdaemon/v2" managerv2 "d7y.io/api/v2/pkg/apis/manager/v2" schedulerv2 "d7y.io/api/v2/pkg/apis/scheduler/v2" schedulerv2mocks "d7y.io/api/v2/pkg/apis/scheduler/v2/mocks" @@ -3363,7 +3364,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { gomock.InOrder( mr.SeedPeer().Return(seedPeerClient).Times(1), - ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(nil).Times(1), + ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1), ) peer.Priority = commonv2.Priority_LEVEL6 @@ -3387,7 +3388,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { gomock.InOrder( mr.SeedPeer().Return(seedPeerClient).Times(1), - ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(errors.New("foo")).Times(1), + ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1), ) peer.Priority = commonv2.Priority_LEVEL6 @@ -3426,7 +3427,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { gomock.InOrder( mr.SeedPeer().Return(seedPeerClient).Times(1), - ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(nil).Times(1), + ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1), ) peer.Priority = commonv2.Priority_LEVEL5 @@ -3450,7 +3451,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { gomock.InOrder( mr.SeedPeer().Return(seedPeerClient).Times(1), - ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(errors.New("foo")).Times(1), + ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1), ) peer.Priority = commonv2.Priority_LEVEL5 @@ -3489,7 +3490,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { gomock.InOrder( mr.SeedPeer().Return(seedPeerClient).Times(1), - ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(nil).Times(1), + ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(&dfdaemonv2.TriggerDownloadTaskResponse{}, nil).Times(1), ) peer.Priority = commonv2.Priority_LEVEL4 @@ -3513,7 +3514,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { gomock.InOrder( mr.SeedPeer().Return(seedPeerClient).Times(1), - ms.DownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *commonv2.Download) { wg.Done() }).Return(errors.New("foo")).Times(1), + ms.TriggerDownloadTask(gomock.All(), gomock.Any()).Do(func(context.Context, *dfdaemonv2.TriggerDownloadTaskRequest) { wg.Done() }).Return(nil, errors.New("foo")).Times(1), ) peer.Priority = commonv2.Priority_LEVEL4