diff --git a/go.mod b/go.mod index 06ef06978..909149713 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.20 require ( - d7y.io/api v1.9.0 + d7y.io/api v1.9.2 github.com/RichardKnop/machinery v1.10.6 github.com/Showmax/go-fqdn v1.0.0 github.com/VividCortex/mysqlerr v1.0.0 diff --git a/go.sum b/go.sum index 315e7e903..7eb6433be 100644 --- a/go.sum +++ b/go.sum @@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo= -d7y.io/api v1.9.0 h1:AzAtFVHXCUM+L5r82IuGp9FfiexfWAYBP7WQ7CYmMaw= -d7y.io/api v1.9.0/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0= +d7y.io/api v1.9.2 h1:JB7sSKY4P9y2J0xsMRVxVpWkgnLPUEAYW2aIB2mEA/4= +d7y.io/api v1.9.2/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index 4ead31203..d90e5a343 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -224,6 +224,20 @@ var ( Help: "Counter of the number of failed of the leaving host.", }) + SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "sync_probes_total", + Help: "Counter of the number of the synchronizing probes.", + }) + + SyncProbesFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "sync_probes_failure_total", + Help: "Counter of the number of failed of the synchronizing probes.", + }) + Traffic = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, diff --git a/scheduler/networktopology/mocks/network_topology_mock.go b/scheduler/networktopology/mocks/network_topology_mock.go index c9013c7cd..0b9c28d0f 100644 --- a/scheduler/networktopology/mocks/network_topology_mock.go +++ b/scheduler/networktopology/mocks/network_topology_mock.go @@ -49,6 +49,21 @@ func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0) } +// FindProbedHostIDs mocks base method. +func (m *MockNetworkTopology) FindProbedHostIDs(arg0 string) ([]string, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "FindProbedHostIDs", arg0) + ret0, _ := ret[0].([]string) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// FindProbedHostIDs indicates an expected call of FindProbedHostIDs. +func (mr *MockNetworkTopologyMockRecorder) FindProbedHostIDs(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHostIDs", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHostIDs), arg0) +} + // Has mocks base method. func (m *MockNetworkTopology) Has(arg0, arg1 string) bool { m.ctrl.T.Helper() @@ -108,11 +123,9 @@ func (mr *MockNetworkTopologyMockRecorder) Probes(arg0, arg1 interface{}) *gomoc } // Serve mocks base method. -func (m *MockNetworkTopology) Serve() error { +func (m *MockNetworkTopology) Serve() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Serve") - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "Serve") } // Serve indicates an expected call of Serve. @@ -136,11 +149,9 @@ func (mr *MockNetworkTopologyMockRecorder) Snapshot() *gomock.Call { } // Stop mocks base method. -func (m *MockNetworkTopology) Stop() error { +func (m *MockNetworkTopology) Stop() { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Stop") - ret0, _ := ret[0].(error) - return ret0 + m.ctrl.Call(m, "Stop") } // Stop indicates an expected call of Stop. diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index 004bfcad8..2c8c6f106 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -43,10 +43,10 @@ const ( // NetworkTopology is an interface for network topology. type NetworkTopology interface { // Started network topology server. - Serve() error + Serve() // Stop network topology server. - Stop() error + Stop() // Has to check if there is a connection between source host and destination host. Has(string, string) bool @@ -54,6 +54,10 @@ type NetworkTopology interface { // Store stores source host and destination host. Store(string, string) error + // TODO Implement function. + // FindProbedHostIDs finds the most candidate destination host to be probed. + FindProbedHostIDs(string) ([]string, error) + // DeleteHost deletes source host and all destination host connected to source host. DeleteHost(string) error @@ -100,7 +104,7 @@ func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalCli } // Started network topology server. -func (nt *networkTopology) Serve() error { +func (nt *networkTopology) Serve() { logger.Info("collect network topology records") tick := time.NewTicker(nt.config.CollectInterval) for { @@ -111,15 +115,14 @@ func (nt *networkTopology) Serve() error { break } case <-nt.done: - return nil + return } } } // Stop network topology server. -func (nt *networkTopology) Stop() error { +func (nt *networkTopology) Stop() { close(nt.done) - return nil } // Has to check if there is a connection between source host and destination host. @@ -157,6 +160,12 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error { return nil } +// TODO Implement function. +// FindProbedHostIDs finds the most candidate destination host to be probed. +func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) { + return nil, nil +} + // DeleteHost deletes source host and all destination host connected to source host. func (nt *networkTopology) DeleteHost(hostID string) error { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) diff --git a/scheduler/networktopology/network_topology_test.go b/scheduler/networktopology/network_topology_test.go index fd44bd8f0..ec05d1498 100644 --- a/scheduler/networktopology/network_topology_test.go +++ b/scheduler/networktopology/network_topology_test.go @@ -99,9 +99,7 @@ func TestNetworkTopology_Serve(t *testing.T) { expect: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) - go func() { - assert.NoError(networkTopology.Serve()) - }() + go networkTopology.Serve() }, }, { @@ -135,9 +133,7 @@ func TestNetworkTopology_Serve(t *testing.T) { expect: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) assert.NoError(err) - go func() { - assert.NoError(networkTopology.Serve()) - }() + go networkTopology.Serve() }, }, } @@ -157,9 +153,7 @@ func TestNetworkTopology_Serve(t *testing.T) { networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) tc.expect(t, networkTopology, err) tc.sleep() - if err := networkTopology.Stop(); err != nil { - t.Fatal(err) - } + networkTopology.Stop() }) } } diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 74a310259..a0d85401f 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -21,6 +21,7 @@ import ( "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/storage" @@ -33,10 +34,11 @@ func New( scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, storage storage.Storage, + networkTopology networktopology.NetworkTopology, opts ...grpc.ServerOption, ) *grpc.Server { return server.New( - newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage), - newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage), + newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage, networkTopology), + newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage, networkTopology), opts...) } diff --git a/scheduler/rpcserver/rpcserver_test.go b/scheduler/rpcserver/rpcserver_test.go index 6046cc8a8..3f006a03d 100644 --- a/scheduler/rpcserver/rpcserver_test.go +++ b/scheduler/rpcserver/rpcserver_test.go @@ -26,6 +26,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" + networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" @@ -62,8 +63,9 @@ func TestRPCServer_New(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) - svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.expect(t, svr) }) } diff --git a/scheduler/rpcserver/scheduler_server_v1.go b/scheduler/rpcserver/scheduler_server_v1.go index 68da8d719..bf1154a09 100644 --- a/scheduler/rpcserver/scheduler_server_v1.go +++ b/scheduler/rpcserver/scheduler_server_v1.go @@ -28,6 +28,7 @@ import ( "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/service" @@ -47,8 +48,9 @@ func newSchedulerServerV1( scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, storage storage.Storage, + networkTopology networktopology.NetworkTopology, ) schedulerv1.SchedulerServer { - return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage)} + return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage, networkTopology)} } // RegisterPeerTask registers peer and triggers seed peer download task. @@ -157,8 +159,15 @@ func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.Leav return new(emptypb.Empty), nil } -// TODO Implement SyncProbes // SyncProbes sync probes of the host. func (s *schedulerServerV1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error { + // Collect SyncProbesCount metrics. + metrics.SyncProbesCount.Inc() + if err := s.service.SyncProbes(stream); err != nil { + // Collect SyncProbesFailureCount metrics. + metrics.SyncProbesFailureCount.Inc() + return err + } + return nil } diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 6bdd87ed6..387fbacfd 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -26,6 +26,7 @@ import ( "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/service" @@ -46,8 +47,9 @@ func newSchedulerServerV2( scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, storage storage.Storage, + networkTopology networktopology.NetworkTopology, ) schedulerv2.SchedulerServer { - return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)} + return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage, networkTopology)} } // AnnouncePeer announces peer to scheduler. @@ -152,5 +154,13 @@ func (s *schedulerServerV2) LeaveHost(ctx context.Context, req *schedulerv2.Leav // SyncProbes sync probes of the host. func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { + // Collect SyncProbesCount metrics. + metrics.SyncProbesCount.Inc() + if err := s.service.SyncProbes(stream); err != nil { + // Collect SyncProbesFailureCount metrics. + metrics.SyncProbesFailureCount.Inc() + return err + } + return nil } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 87e040919..a405c9417 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -232,25 +232,6 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } s.resource = resource - // Initialize scheduling. - scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir()) - - // Initialize server options of scheduler grpc server. - schedulerServerOptions := []grpc.ServerOption{} - if certifyClient != nil { - serverTransportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, cfg.Security.TLSVerify, []byte(cfg.Security.CACert), certifyClient) - if err != nil { - return nil, err - } - - schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(serverTransportCredentials)) - } else { - schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(insecure.NewCredentials())) - } - - svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...) - s.grpcServer = svr - // Initialize redis client. var rdb redis.UniversalClient if pkgredis.IsEnabled(cfg.Database.Redis.Addrs) { @@ -282,6 +263,25 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } } + // Initialize scheduling. + scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir()) + + // Initialize server options of scheduler grpc server. + schedulerServerOptions := []grpc.ServerOption{} + if certifyClient != nil { + serverTransportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, cfg.Security.TLSVerify, []byte(cfg.Security.CACert), certifyClient) + if err != nil { + return nil, err + } + + schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(serverTransportCredentials)) + } else { + schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(insecure.NewCredentials())) + } + + svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, s.networkTopology, schedulerServerOptions...) + s.grpcServer = svr + // Initialize metrics. if cfg.Metrics.Enable { s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer) @@ -296,6 +296,7 @@ func (s *Server) Serve() error { if err := s.dynconfig.Serve(); err != nil { logger.Fatalf("dynconfig start failed %s", err.Error()) } + logger.Info("dynconfig start successfully") }() @@ -304,7 +305,7 @@ func (s *Server) Serve() error { logger.Info("gc start successfully") // Serve Job. - if s.config.Job.Enable && pkgredis.IsEnabled(s.config.Database.Redis.Addrs) { + if s.job != nil { s.job.Serve() logger.Info("job start successfully") } @@ -317,6 +318,7 @@ func (s *Server) Serve() error { if err == http.ErrServerClosed { return } + logger.Fatalf("metrics server closed unexpect: %s", err.Error()) } }() @@ -328,6 +330,14 @@ func (s *Server) Serve() error { logger.Info("announcer start successfully") }() + // Serve network topology. + if s.networkTopology != nil { + go func() { + s.networkTopology.Serve() + logger.Info("network topology start successfully") + }() + } + // Generate GRPC limit listener. ip, ok := ip.FormatIP(s.config.Server.ListenIP.String()) if !ok { @@ -423,6 +433,12 @@ func (s *Server) Stop() { } } + // Stop network topology. + if s.networkTopology != nil { + s.networkTopology.Stop() + logger.Info("network topology closed") + } + // Stop GRPC server. stopped := make(chan struct{}) go func() { diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 4bb4571bc..2c6048ed1 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -26,7 +26,9 @@ import ( "time" "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" commonv1 "d7y.io/api/pkg/apis/common/v1" commonv2 "d7y.io/api/pkg/apis/common/v2" @@ -43,6 +45,7 @@ import ( "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/storage" @@ -64,6 +67,9 @@ type V1 struct { // Storage interface. storage storage.Storage + + // Network topology interface. + networkTopology networktopology.NetworkTopology } // New v1 version of service instance. @@ -73,13 +79,15 @@ func NewV1( scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, storage storage.Storage, + networktopology networktopology.NetworkTopology, ) *V1 { return &V1{ - resource: resource, - scheduling: scheduling, - config: cfg, - dynconfig: dynconfig, - storage: storage, + resource: resource, + scheduling: scheduling, + config: cfg, + dynconfig: dynconfig, + storage: storage, + networkTopology: networktopology, } } @@ -647,6 +655,111 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e return nil } +// SyncProbes sync probes of the host. +func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error { + if v.networkTopology == nil { + return status.Errorf(codes.Unimplemented, "network topology is not enabled") + } + + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + logger.Errorf("receive error: %s", err.Error()) + return err + } + + logger := logger.WithHost(req.Host.Id, req.Host.Hostname, req.Host.Ip) + switch syncProbesRequest := req.GetRequest().(type) { + case *schedulerv1.SyncProbesRequest_ProbeStartedRequest: + // Find probed hosts in network topology. Based on the source host information, + // the most candidate hosts will be evaluated. + logger.Info("receive SyncProbesRequest_ProbeStartedRequest") + probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) + if err != nil { + logger.Error(err) + return status.Error(codes.FailedPrecondition, err.Error()) + } + + var probedHosts []*commonv1.Host + for _, probedHostID := range probedHostIDs { + probedHost, loaded := v.resource.HostManager().Load(probedHostID) + if !loaded { + logger.Warnf("probed host %s not found", probedHostID) + continue + } + + probedHosts = append(probedHosts, &commonv1.Host{ + Id: probedHost.ID, + Ip: probedHost.IP, + Hostname: probedHost.Hostname, + Port: probedHost.Port, + DownloadPort: probedHost.DownloadPort, + Location: probedHost.Network.Location, + Idc: probedHost.Network.IDC, + }) + } + + if len(probedHosts) == 0 { + logger.Error("probed host not found") + return status.Error(codes.NotFound, "probed host not found") + } + + logger.Infof("probe started: %#v", probedHosts) + if err := stream.Send(&schedulerv1.SyncProbesResponse{ + Hosts: probedHosts, + ProbeInterval: durationpb.New(v.config.NetworkTopology.Probe.Interval), + }); err != nil { + logger.Error(err) + return err + } + case *schedulerv1.SyncProbesRequest_ProbeFinishedRequest: + // Store probes in network topology. First create the association between + // source host and destination host, and then store the value of probe. + logger.Info("receive SyncProbesRequest_ProbeFinishedRequest") + for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes { + probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id) + if !loaded { + logger.Errorf("host %s not found", probe.Host.Id) + continue + } + + if err := v.networkTopology.Store(req.Host.Id, probedHost.ID); err != nil { + logger.Errorf("store failed: %s", err.Error()) + continue + } + + if err := v.networkTopology.Probes(req.Host.Id, probe.Host.Id).Enqueue(&networktopology.Probe{ + Host: probedHost, + RTT: probe.Rtt.AsDuration(), + CreatedAt: probe.CreatedAt.AsTime(), + }); err != nil { + logger.Errorf("enqueue failed: %s", err.Error()) + continue + } + + logger.Infof("probe finished: %#v", probe) + } + case *schedulerv1.SyncProbesRequest_ProbeFailedRequest: + // Log failed probes. + logger.Info("receive SyncProbesRequest_ProbeFailedRequest") + var failedProbedHostIDs []string + for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes { + failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id) + } + + logger.Warnf("probe failed: %#v", failedProbedHostIDs) + default: + msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest) + logger.Error(msg) + return status.Error(codes.FailedPrecondition, msg) + } + } +} + // triggerTask triggers the first download of the task. func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, task *resource.Task, host *resource.Host, peer *resource.Peer, dynconfig config.DynconfigInterface) error { // If task has available peer, peer does not need to be triggered. diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 8c7e688c9..9a3d39459 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -36,6 +36,8 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" commonv1 "d7y.io/api/pkg/apis/common/v1" commonv2 "d7y.io/api/pkg/apis/common/v2" @@ -43,6 +45,7 @@ import ( managerv2 "d7y.io/api/pkg/apis/manager/v2" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" + schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" "d7y.io/dragonfly/v2/internal/dferrors" "d7y.io/dragonfly/v2/manager/types" @@ -54,6 +57,8 @@ import ( pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" + "d7y.io/dragonfly/v2/scheduler/networktopology" + networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" @@ -69,6 +74,16 @@ var ( BackToSourceCount: int(mockTaskBackToSourceLimit), } + mockNetworkTopologyConfig = config.NetworkTopologyConfig{ + Enable: true, + CollectInterval: 2 * time.Hour, + Probe: config.ProbeConfig{ + QueueLength: 5, + Interval: 15 * time.Minute, + Count: 10, + }, + } + mockRawHost = resource.Host{ ID: mockHostID, Type: pkgtypes.HostTypeNormal, @@ -205,6 +220,86 @@ var ( Cost: 1 * time.Minute, CreatedAt: time.Now(), } + + mockV1Probe = &schedulerv1.Probe{ + Host: &commonv1.Host{ + Id: mockRawHost.ID, + Ip: mockRawHost.IP, + Hostname: mockRawHost.Hostname, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Location: mockRawHost.Network.Location, + Idc: mockRawHost.Network.IDC, + }, + Rtt: durationpb.New(30 * time.Millisecond), + CreatedAt: timestamppb.Now(), + } + + mockV2Probe = &schedulerv2.Probe{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "foo", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + Rtt: durationpb.New(30 * time.Millisecond), + CreatedAt: timestamppb.Now(), + } ) func TestService_NewV1(t *testing.T) { @@ -229,7 +324,9 @@ func TestService_NewV1(t *testing.T) { resource := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage)) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + + tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage, networkTopology)) }) } } @@ -891,10 +988,11 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) taskManager := resource.NewMockTaskManager(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -1156,9 +1254,10 @@ func TestServiceV1_ReportPieceResult(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) stream := schedulerv1mocks.NewMockScheduler_ReportPieceResultServer(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -1339,8 +1438,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -1403,8 +1503,9 @@ func TestServiceV1_StatTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) taskManager := resource.NewMockTaskManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) @@ -1697,10 +1798,11 @@ func TestServiceV1_AnnounceTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) taskManager := resource.NewMockTaskManager(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -1897,13 +1999,14 @@ func TestServiceV1_LeaveTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT()) tc.expect(t, peer, svc.LeaveTask(context.Background(), &schedulerv1.PeerTarget{})) @@ -2327,11 +2430,12 @@ func TestServiceV1_AnnounceHost(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2539,13 +2643,14 @@ func TestServiceV1_LeaveHost(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT()) tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{ @@ -2555,6 +2660,425 @@ func TestServiceV1_LeaveHost(t *testing.T) { } } +func TestServiceV1_SyncProbes(t *testing.T) { + tests := []struct { + name string + mock func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) + expect func(t *testing.T, err error) + }{ + { + name: "network topology is not enabled", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + svc.networkTopology = nil + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = Unimplemented desc = network topology is not enabled") + }, + }, + { + name: "synchronize probes when receive ProbeStartedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{ + Hosts: []*commonv1.Host{ + { + Id: mockRawHost.ID, + Ip: mockRawHost.IP, + Hostname: mockRawHost.Hostname, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Location: mockRawHost.Network.Location, + Idc: mockRawHost.Network.IDC, + }, + }, + ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval), + })).Return(nil).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "synchronize probes when receive ProbeFinishedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{ + Probes: []*schedulerv1.Probe{mockV1Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1), + mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1), + mp.Enqueue(gomock.Eq(&networktopology.Probe{ + Host: &mockRawHost, + RTT: mockV1Probe.Rtt.AsDuration(), + CreatedAt: mockV1Probe.CreatedAt.AsTime(), + })).Return(nil).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "synchronize probes when receive ProbeFailedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{ + ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{ + Probes: []*schedulerv1.FailedProbe{ + { + Host: &commonv1.Host{ + Id: mockRawHost.ID, + Ip: mockRawHost.IP, + Hostname: mockRawHost.Hostname, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Location: mockRawHost.Network.Location, + Idc: mockRawHost.Network.IDC, + }, + Description: "foo", + }, + }, + }, + }, + }, nil).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "synchronize probes when receive fail type request", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: nil, + }, nil).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = FailedPrecondition desc = receive unknow request: ") + }, + }, + { + name: "receive error", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + ms.Recv().Return(nil, errors.New("receive error")).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "receive error") + }, + }, + { + name: "receive end of file", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + ms.Recv().Return(nil, io.EOF).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "find probed host ids error", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error") + }, + }, + { + name: "load host error when receive ProbeStartedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found") + }, + }, + { + name: "send synchronize probes response error", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{ + Hosts: []*commonv1.Host{ + { + Id: mockRawHost.ID, + Ip: mockRawHost.IP, + Hostname: mockRawHost.Hostname, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Location: mockRawHost.Network.Location, + Idc: mockRawHost.Network.IDC, + }, + }, + ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval), + })).Return(errors.New("send synchronize probes response error")).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "send synchronize probes response error") + }, + }, + { + name: "load host error when receive ProbeFinishedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{ + Probes: []*schedulerv1.Probe{mockV1Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "store error when receive ProbeFinishedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{ + Probes: []*schedulerv1.Probe{mockV1Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(errors.New("store error")).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "enqueue probe error when receive ProbeFinishedRequest", + mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv1.SyncProbesRequest{ + Host: &commonv1.Host{ + Id: mockRawSeedHost.ID, + Ip: mockRawSeedHost.IP, + Hostname: mockRawSeedHost.Hostname, + Port: mockRawSeedHost.Port, + DownloadPort: mockRawSeedHost.DownloadPort, + Location: mockRawSeedHost.Network.Location, + Idc: mockRawSeedHost.Network.IDC, + }, + Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{ + Probes: []*schedulerv1.Probe{mockV1Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1), + mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1), + mp.Enqueue(gomock.Any()).Return(errors.New("enqueue probe error")).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + probes := networktopologymocks.NewMockProbes(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + hostManager := resource.NewMockHostManager(ctl) + stream := schedulerv1mocks.NewMockScheduler_SyncProbesServer(ctl) + svc := NewV1(&config.Config{NetworkTopology: mockNetworkTopologyConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) + + tc.mock(svc, res.EXPECT(), probes, probes.EXPECT(), networkTopology.EXPECT(), hostManager, hostManager.EXPECT(), stream.EXPECT()) + tc.expect(t, svc.SyncProbes(stream)) + }) + } +} + func TestServiceV1_triggerTask(t *testing.T) { tests := []struct { name string @@ -3005,7 +3529,8 @@ func TestServiceV1_triggerTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(tc.config, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(tc.config, res, scheduling, dynconfig, storage, networkTopology) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -3109,7 +3634,8 @@ func TestServiceV1_storeTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) taskManager := resource.NewMockTaskManager(ctl) tc.run(t, svc, taskManager, res.EXPECT(), taskManager.EXPECT()) }) @@ -3187,7 +3713,8 @@ func TestServiceV1_storeHost(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) hostManager := resource.NewMockHostManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -3271,7 +3798,8 @@ func TestServiceV1_storePeer(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) peerManager := resource.NewMockPeerManager(ctl) tc.run(t, svc, peerManager, res.EXPECT(), peerManager.EXPECT()) @@ -3330,13 +3858,14 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) seedPeer := resource.NewMockSeedPeer(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, task, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(task, peer, seedPeer, res.EXPECT(), seedPeer.EXPECT()) svc.triggerSeedPeerTask(context.Background(), &mockPeerRange, task) @@ -3412,12 +3941,13 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(peer, scheduling.EXPECT()) svc.handleBeginOfPiece(context.Background(), peer) @@ -3551,8 +4081,9 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(tc.peer, peerManager, res.EXPECT(), peerManager.EXPECT()) svc.handlePieceSuccess(context.Background(), tc.peer, tc.piece) @@ -3740,6 +4271,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -3748,7 +4280,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) seedPeer := resource.NewMockSeedPeer(ctl) - svc := NewV1(tc.config, res, scheduling, dynconfig, storage) + svc := NewV1(tc.config, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, parent, tc.piece, peerManager, seedPeer, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT(), seedPeer.EXPECT()) }) @@ -3848,6 +4380,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) url, err := url.Parse(s.URL) if err != nil { @@ -3871,7 +4404,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(peer) svc.handlePeerSuccess(context.Background(), peer) @@ -3946,7 +4479,8 @@ func TestServiceV1_handlePeerFail(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -4032,7 +4566,8 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) @@ -4171,7 +4706,8 @@ func TestServiceV1_handleTaskFail(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 2289ba08d..c0ef795ee 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -37,6 +37,7 @@ import ( "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/metrics" + "d7y.io/dragonfly/v2/scheduler/networktopology" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/storage" @@ -58,6 +59,9 @@ type V2 struct { // Storage interface. storage storage.Storage + + // Network topology interface. + networkTopology networktopology.NetworkTopology } // New v2 version of service instance. @@ -67,13 +71,15 @@ func NewV2( scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, storage storage.Storage, + networkTopology networktopology.NetworkTopology, ) *V2 { return &V2{ - resource: resource, - scheduling: scheduling, - config: cfg, - dynconfig: dynconfig, - storage: storage, + resource: resource, + scheduling: scheduling, + config: cfg, + dynconfig: dynconfig, + storage: storage, + networkTopology: networkTopology, } } @@ -637,6 +643,163 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e return nil } +// SyncProbes sync probes of the host. +func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { + if v.networkTopology == nil { + return status.Errorf(codes.Unimplemented, "network topology is not enabled") + } + + for { + req, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + logger.Errorf("receive error: %s", err.Error()) + return err + } + + logger := logger.WithHost(req.Host.Id, req.Host.Hostname, req.Host.Ip) + switch syncProbesRequest := req.GetRequest().(type) { + case *schedulerv2.SyncProbesRequest_ProbeStartedRequest: + // Find probed hosts in network topology. Based on the source host information, + // the most candidate hosts will be evaluated. + logger.Info("receive SyncProbesRequest_ProbeStartedRequest") + probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) + if err != nil { + logger.Error(err) + return status.Error(codes.FailedPrecondition, err.Error()) + } + + var probedHosts []*commonv2.Host + for _, probedHostID := range probedHostIDs { + probedHost, loaded := v.resource.HostManager().Load(probedHostID) + if !loaded { + logger.Warnf("probed host %s not found", probedHostID) + continue + } + + probedHosts = append(probedHosts, &commonv2.Host{ + Id: probedHost.ID, + Type: uint32(probedHost.Type), + Hostname: probedHost.Hostname, + Ip: probedHost.IP, + Port: probedHost.Port, + DownloadPort: probedHost.DownloadPort, + Os: probedHost.OS, + Platform: probedHost.Platform, + PlatformFamily: probedHost.PlatformFamily, + PlatformVersion: probedHost.PlatformVersion, + KernelVersion: probedHost.KernelVersion, + Cpu: &commonv2.CPU{ + LogicalCount: probedHost.CPU.LogicalCount, + PhysicalCount: probedHost.CPU.PhysicalCount, + Percent: probedHost.CPU.Percent, + ProcessPercent: probedHost.CPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: probedHost.CPU.Times.User, + System: probedHost.CPU.Times.System, + Idle: probedHost.CPU.Times.Idle, + Nice: probedHost.CPU.Times.Nice, + Iowait: probedHost.CPU.Times.Iowait, + Irq: probedHost.CPU.Times.Irq, + Softirq: probedHost.CPU.Times.Softirq, + Steal: probedHost.CPU.Times.Steal, + Guest: probedHost.CPU.Times.Guest, + GuestNice: probedHost.CPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: probedHost.Memory.Total, + Available: probedHost.Memory.Available, + Used: probedHost.Memory.Used, + UsedPercent: probedHost.Memory.UsedPercent, + ProcessUsedPercent: probedHost.Memory.ProcessUsedPercent, + Free: probedHost.Memory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: probedHost.Network.TCPConnectionCount, + UploadTcpConnectionCount: probedHost.Network.UploadTCPConnectionCount, + Location: probedHost.Network.Location, + Idc: probedHost.Network.IDC, + }, + Disk: &commonv2.Disk{ + Total: probedHost.Disk.Total, + Free: probedHost.Disk.Free, + Used: probedHost.Disk.Used, + UsedPercent: probedHost.Disk.UsedPercent, + InodesTotal: probedHost.Disk.InodesTotal, + InodesUsed: probedHost.Disk.InodesUsed, + InodesFree: probedHost.Disk.InodesFree, + InodesUsedPercent: probedHost.Disk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: probedHost.Build.GitVersion, + GitCommit: probedHost.Build.GitCommit, + GoVersion: probedHost.Build.GoVersion, + Platform: probedHost.Build.Platform, + }, + }) + } + + if len(probedHosts) == 0 { + logger.Error("probed host not found") + return status.Error(codes.NotFound, "probed host not found") + } + + logger.Infof("probe started: %#v", probedHosts) + if err := stream.Send(&schedulerv2.SyncProbesResponse{ + Hosts: probedHosts, + ProbeInterval: durationpb.New(v.config.NetworkTopology.Probe.Interval), + }); err != nil { + logger.Error(err) + return err + } + case *schedulerv2.SyncProbesRequest_ProbeFinishedRequest: + // Store probes in network topology. First create the association between + // source host and destination host, and then store the value of probe. + logger.Info("receive SyncProbesRequest_ProbeFinishedRequest") + for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes { + probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id) + if !loaded { + logger.Errorf("host %s not found", probe.Host.Id) + continue + } + + if err := v.networkTopology.Store(req.Host.Id, probedHost.ID); err != nil { + logger.Errorf("store failed: %s", err.Error()) + continue + } + + if err := v.networkTopology.Probes(req.Host.Id, probe.Host.Id).Enqueue(&networktopology.Probe{ + Host: probedHost, + RTT: probe.Rtt.AsDuration(), + CreatedAt: probe.CreatedAt.AsTime(), + }); err != nil { + logger.Errorf("enqueue failed: %s", err.Error()) + continue + } + + logger.Infof("probe finished: %#v", probe) + } + case *schedulerv2.SyncProbesRequest_ProbeFailedRequest: + // Log failed probes. + logger.Info("receive SyncProbesRequest_ProbeFailedRequest") + var failedProbedHostIDs []string + for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes { + failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id) + } + + logger.Warnf("probe failed: %#v", failedProbedHostIDs) + default: + msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest) + logger.Error(msg) + return status.Error(codes.FailedPrecondition, msg) + } + } +} + // handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest. func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error { // Handle resource included host, task, and peer. diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index fc251b215..1f9dd3133 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -19,6 +19,7 @@ package service import ( "context" "errors" + "io" "net" "net/http" "net/http/httptest" @@ -46,7 +47,10 @@ import ( pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" + "d7y.io/dragonfly/v2/scheduler/networktopology" + networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks" "d7y.io/dragonfly/v2/scheduler/resource" + "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) @@ -73,7 +77,9 @@ func TestService_NewV2(t *testing.T) { resource := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage)) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + + tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage, networkTopology)) }) } } @@ -237,13 +243,14 @@ func TestServiceV2_StatPeer(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT()) resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}) @@ -308,13 +315,14 @@ func TestServiceV2_LeavePeer(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT()) tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})) @@ -394,9 +402,10 @@ func TestServiceV2_StatTask(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) taskManager := resource.NewMockTaskManager(ctl) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(task, taskManager, res.EXPECT(), taskManager.EXPECT()) resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{Id: mockTaskID}) @@ -829,11 +838,12 @@ func TestServiceV2_AnnounceHost(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) }) @@ -898,13 +908,14 @@ func TestServiceV2_LeaveHost(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT()) tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID})) @@ -912,6 +923,542 @@ func TestServiceV2_LeaveHost(t *testing.T) { } } +func TestServiceV2_SyncProbes(t *testing.T) { + tests := []struct { + name string + mock func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) + expect func(t *testing.T, err error) + }{ + { + name: "network topology is not enabled", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + svc.networkTopology = nil + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = Unimplemented desc = network topology is not enabled") + }, + }, + { + name: "synchronize probes when receive ProbeStartedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{ + Hosts: []*commonv2.Host{ + { + Id: mockRawHost.ID, + Type: uint32(mockRawHost.Type), + Hostname: mockRawHost.Hostname, + Ip: mockRawHost.IP, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Os: mockRawHost.OS, + Platform: mockRawHost.Platform, + PlatformFamily: mockRawHost.PlatformFamily, + PlatformVersion: mockRawHost.PlatformVersion, + KernelVersion: mockRawHost.KernelVersion, + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + }, + ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval), + })).Return(nil).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "synchronize probes when receive ProbeFinishedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{ + Probes: []*schedulerv2.Probe{mockV2Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1), + mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1), + mp.Enqueue(gomock.Eq(&networktopology.Probe{ + Host: &mockRawHost, + RTT: mockV2Probe.Rtt.AsDuration(), + CreatedAt: mockV2Probe.CreatedAt.AsTime(), + })).Return(nil).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "synchronize probes when receive ProbeFailedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeFailedRequest{ + ProbeFailedRequest: &schedulerv2.ProbeFailedRequest{ + Probes: []*schedulerv2.FailedProbe{ + { + Host: &commonv2.Host{ + Id: mockRawHost.ID, + Type: uint32(mockRawHost.Type), + Hostname: mockRawHost.Hostname, + Ip: mockRawHost.IP, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Os: mockRawHost.OS, + Platform: mockRawHost.Platform, + PlatformFamily: mockRawHost.PlatformFamily, + PlatformVersion: mockRawHost.PlatformVersion, + KernelVersion: mockRawHost.KernelVersion, + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Description: "foo", + }, + }, + }, + }, + }, nil).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "synchronize probes when receive fail type request", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: nil, + }, nil).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = FailedPrecondition desc = receive unknow request: ") + }, + }, + { + name: "receive error", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + ms.Recv().Return(nil, errors.New("receive error")).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "receive error") + }, + }, + { + name: "receive end of file", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + ms.Recv().Return(nil, io.EOF).Times(1) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "find probed host ids error", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error") + }, + }, + { + name: "load host error when receive ProbeStartedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found") + }, + }, + { + name: "send synchronize probes response error", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, + }, + }, nil).Times(1), + mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{ + Hosts: []*commonv2.Host{ + { + Id: mockRawHost.ID, + Type: uint32(mockRawHost.Type), + Hostname: mockRawHost.Hostname, + Ip: mockRawHost.IP, + Port: mockRawHost.Port, + DownloadPort: mockRawHost.DownloadPort, + Os: mockRawHost.OS, + Platform: mockRawHost.Platform, + PlatformFamily: mockRawHost.PlatformFamily, + PlatformVersion: mockRawHost.PlatformVersion, + KernelVersion: mockRawHost.KernelVersion, + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + }, + ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval), + })).Return(errors.New("send synchronize probes response error")).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.EqualError(err, "send synchronize probes response error") + }, + }, + { + name: "load host error when receive ProbeFinishedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{ + Probes: []*schedulerv2.Probe{mockV2Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "store error when receive ProbeFinishedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{ + Probes: []*schedulerv2.Probe{mockV2Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(errors.New("store error")).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "enqueue probe error when receive ProbeFinishedRequest", + mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, + mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, + ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { + gomock.InOrder( + ms.Recv().Return(&schedulerv2.SyncProbesRequest{ + Host: &commonv2.Host{ + Id: mockSeedHostID, + Type: uint32(pkgtypes.HostTypeSuperSeed), + Hostname: "bar", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: mockV2Probe.Host.Cpu, + Memory: mockV2Probe.Host.Memory, + Network: mockV2Probe.Host.Network, + Disk: mockV2Probe.Host.Disk, + Build: mockV2Probe.Host.Build, + }, + Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{ + Probes: []*schedulerv2.Probe{mockV2Probe}, + }, + }, + }, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1), + mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1), + mp.Enqueue(gomock.Any()).Return(errors.New("enqueue probe error")).Times(1), + ms.Recv().Return(nil, io.EOF).Times(1), + ) + }, + expect: func(t *testing.T, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + probes := networktopologymocks.NewMockProbes(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) + hostManager := resource.NewMockHostManager(ctl) + stream := schedulerv2mocks.NewMockScheduler_SyncProbesServer(ctl) + svc := NewV2(&config.Config{NetworkTopology: mockNetworkTopologyConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) + + tc.mock(svc, res.EXPECT(), probes, probes.EXPECT(), networkTopology.EXPECT(), hostManager, hostManager.EXPECT(), stream.EXPECT()) + tc.expect(t, svc.SyncProbes(stream)) + }) + } +} + func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { tests := []struct { name string @@ -1401,6 +1948,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) peerManager := resource.NewMockPeerManager(ctl) taskManager := resource.NewMockTaskManager(ctl) @@ -1412,7 +1960,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT()) }) @@ -1910,6 +2458,7 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) peerManager := resource.NewMockPeerManager(ctl) taskManager := resource.NewMockTaskManager(ctl) @@ -1921,7 +2470,7 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) { mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT()) }) @@ -2006,6 +2555,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2013,7 +2563,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2098,6 +2648,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2105,7 +2656,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2171,6 +2722,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2178,7 +2730,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2432,6 +2984,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) url, err := url.Parse(s.URL) @@ -2457,7 +3010,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2522,6 +3075,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2529,7 +3083,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2641,6 +3195,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2648,7 +3203,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2799,6 +3354,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2806,7 +3362,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) }) @@ -2924,6 +3480,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -2931,7 +3488,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) }) @@ -3066,6 +3623,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -3073,7 +3631,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT()) }) @@ -3129,6 +3687,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( @@ -3136,7 +3695,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT()) }) @@ -3336,6 +3895,7 @@ func TestServiceV2_handleResource(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) hostManager := resource.NewMockHostManager(ctl) taskManager := resource.NewMockTaskManager(ctl) peerManager := resource.NewMockPeerManager(ctl) @@ -3346,7 +3906,7 @@ func TestServiceV2_handleResource(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) }) @@ -3615,6 +4175,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) + networkTopology := networktopologymocks.NewMockNetworkTopology(ctl) seedPeerClient := resource.NewMockSeedPeer(ctl) mockHost := resource.NewHost( @@ -3622,7 +4183,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&tc.config, res, scheduling, dynconfig, storage) + svc := NewV2(&tc.config, res, scheduling, dynconfig, storage, networkTopology) tc.run(t, svc, peer, seedPeerClient, res.EXPECT(), seedPeerClient.EXPECT()) })