feat: implement SyncProbes api in scheduler grpc service (#2449)

Signed-off-by: Gaius <gaius.qi@gmail.com>
Signed-off-by: XZ <834756128@qq.com>
Co-authored-by: dlut_xz <52518280+fcgxz2003@users.noreply.github.com>
This commit is contained in:
Gaius 2023-06-14 21:42:32 +08:00
parent 8b7ceb60b5
commit 98aee9640d
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
15 changed files with 1543 additions and 103 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.20
require (
d7y.io/api v1.9.0
d7y.io/api v1.9.2
github.com/RichardKnop/machinery v1.10.6
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0

4
go.sum
View File

@ -51,8 +51,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.14.0/go.mod h1:GrKmX003DSIwi9o29oFT7YDnHYwZoctc3fOKtUw0Xmo=
d7y.io/api v1.9.0 h1:AzAtFVHXCUM+L5r82IuGp9FfiexfWAYBP7WQ7CYmMaw=
d7y.io/api v1.9.0/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0=
d7y.io/api v1.9.2 h1:JB7sSKY4P9y2J0xsMRVxVpWkgnLPUEAYW2aIB2mEA/4=
d7y.io/api v1.9.2/go.mod h1:6bn5Z+OyjyvlB1UMxUZsFbyx47qjkpNEvC25hq5Qxy0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20201218220906-28db891af037/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -224,6 +224,20 @@ var (
Help: "Counter of the number of failed of the leaving host.",
})
SyncProbesCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "sync_probes_total",
Help: "Counter of the number of the synchronizing probes.",
})
SyncProbesFailureCount = promauto.NewCounter(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "sync_probes_failure_total",
Help: "Counter of the number of failed of the synchronizing probes.",
})
Traffic = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,

View File

@ -49,6 +49,21 @@ func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0)
}
// FindProbedHostIDs mocks base method.
func (m *MockNetworkTopology) FindProbedHostIDs(arg0 string) ([]string, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindProbedHostIDs", arg0)
ret0, _ := ret[0].([]string)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// FindProbedHostIDs indicates an expected call of FindProbedHostIDs.
func (mr *MockNetworkTopologyMockRecorder) FindProbedHostIDs(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHostIDs", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHostIDs), arg0)
}
// Has mocks base method.
func (m *MockNetworkTopology) Has(arg0, arg1 string) bool {
m.ctrl.T.Helper()
@ -108,11 +123,9 @@ func (mr *MockNetworkTopologyMockRecorder) Probes(arg0, arg1 interface{}) *gomoc
}
// Serve mocks base method.
func (m *MockNetworkTopology) Serve() error {
func (m *MockNetworkTopology) Serve() {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Serve")
ret0, _ := ret[0].(error)
return ret0
m.ctrl.Call(m, "Serve")
}
// Serve indicates an expected call of Serve.
@ -136,11 +149,9 @@ func (mr *MockNetworkTopologyMockRecorder) Snapshot() *gomock.Call {
}
// Stop mocks base method.
func (m *MockNetworkTopology) Stop() error {
func (m *MockNetworkTopology) Stop() {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Stop")
ret0, _ := ret[0].(error)
return ret0
m.ctrl.Call(m, "Stop")
}
// Stop indicates an expected call of Stop.

View File

@ -43,10 +43,10 @@ const (
// NetworkTopology is an interface for network topology.
type NetworkTopology interface {
// Started network topology server.
Serve() error
Serve()
// Stop network topology server.
Stop() error
Stop()
// Has to check if there is a connection between source host and destination host.
Has(string, string) bool
@ -54,6 +54,10 @@ type NetworkTopology interface {
// Store stores source host and destination host.
Store(string, string) error
// TODO Implement function.
// FindProbedHostIDs finds the most candidate destination host to be probed.
FindProbedHostIDs(string) ([]string, error)
// DeleteHost deletes source host and all destination host connected to source host.
DeleteHost(string) error
@ -100,7 +104,7 @@ func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalCli
}
// Started network topology server.
func (nt *networkTopology) Serve() error {
func (nt *networkTopology) Serve() {
logger.Info("collect network topology records")
tick := time.NewTicker(nt.config.CollectInterval)
for {
@ -111,15 +115,14 @@ func (nt *networkTopology) Serve() error {
break
}
case <-nt.done:
return nil
return
}
}
}
// Stop network topology server.
func (nt *networkTopology) Stop() error {
func (nt *networkTopology) Stop() {
close(nt.done)
return nil
}
// Has to check if there is a connection between source host and destination host.
@ -157,6 +160,12 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error {
return nil
}
// TODO Implement function.
// FindProbedHostIDs finds the most candidate destination host to be probed.
func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) {
return nil, nil
}
// DeleteHost deletes source host and all destination host connected to source host.
func (nt *networkTopology) DeleteHost(hostID string) error {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)

View File

@ -99,9 +99,7 @@ func TestNetworkTopology_Serve(t *testing.T) {
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
go func() {
assert.NoError(networkTopology.Serve())
}()
go networkTopology.Serve()
},
},
{
@ -135,9 +133,7 @@ func TestNetworkTopology_Serve(t *testing.T) {
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
assert := assert.New(t)
assert.NoError(err)
go func() {
assert.NoError(networkTopology.Serve())
}()
go networkTopology.Serve()
},
},
}
@ -157,9 +153,7 @@ func TestNetworkTopology_Serve(t *testing.T) {
networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage)
tc.expect(t, networkTopology, err)
tc.sleep()
if err := networkTopology.Stop(); err != nil {
t.Fatal(err)
}
networkTopology.Stop()
})
}
}

View File

@ -21,6 +21,7 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/scheduler/server"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
@ -33,10 +34,11 @@ func New(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
opts ...grpc.ServerOption,
) *grpc.Server {
return server.New(
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage, networkTopology),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage, networkTopology),
opts...)
}

View File

@ -26,6 +26,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
@ -62,8 +63,9 @@ func TestRPCServer_New(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.expect(t, svr)
})
}

View File

@ -28,6 +28,7 @@ import (
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
@ -47,8 +48,9 @@ func newSchedulerServerV1(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
) schedulerv1.SchedulerServer {
return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage, networkTopology)}
}
// RegisterPeerTask registers peer and triggers seed peer download task.
@ -157,8 +159,15 @@ func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.Leav
return new(emptypb.Empty), nil
}
// TODO Implement SyncProbes
// SyncProbes sync probes of the host.
func (s *schedulerServerV1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
// Collect SyncProbesCount metrics.
metrics.SyncProbesCount.Inc()
if err := s.service.SyncProbes(stream); err != nil {
// Collect SyncProbesFailureCount metrics.
metrics.SyncProbesFailureCount.Inc()
return err
}
return nil
}

View File

@ -26,6 +26,7 @@ import (
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
@ -46,8 +47,9 @@ func newSchedulerServerV2(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage, networkTopology)}
}
// AnnouncePeer announces peer to scheduler.
@ -152,5 +154,13 @@ func (s *schedulerServerV2) LeaveHost(ctx context.Context, req *schedulerv2.Leav
// SyncProbes sync probes of the host.
func (s *schedulerServerV2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
// Collect SyncProbesCount metrics.
metrics.SyncProbesCount.Inc()
if err := s.service.SyncProbes(stream); err != nil {
// Collect SyncProbesFailureCount metrics.
metrics.SyncProbesFailureCount.Inc()
return err
}
return nil
}

View File

@ -232,25 +232,6 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}
s.resource = resource
// Initialize scheduling.
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir())
// Initialize server options of scheduler grpc server.
schedulerServerOptions := []grpc.ServerOption{}
if certifyClient != nil {
serverTransportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, cfg.Security.TLSVerify, []byte(cfg.Security.CACert), certifyClient)
if err != nil {
return nil, err
}
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(serverTransportCredentials))
} else {
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(insecure.NewCredentials()))
}
svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
s.grpcServer = svr
// Initialize redis client.
var rdb redis.UniversalClient
if pkgredis.IsEnabled(cfg.Database.Redis.Addrs) {
@ -282,6 +263,25 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}
}
// Initialize scheduling.
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir())
// Initialize server options of scheduler grpc server.
schedulerServerOptions := []grpc.ServerOption{}
if certifyClient != nil {
serverTransportCredentials, err := rpc.NewServerCredentialsByCertify(cfg.Security.TLSPolicy, cfg.Security.TLSVerify, []byte(cfg.Security.CACert), certifyClient)
if err != nil {
return nil, err
}
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(serverTransportCredentials))
} else {
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(insecure.NewCredentials()))
}
svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, s.networkTopology, schedulerServerOptions...)
s.grpcServer = svr
// Initialize metrics.
if cfg.Metrics.Enable {
s.metricsServer = metrics.New(&cfg.Metrics, s.grpcServer)
@ -296,6 +296,7 @@ func (s *Server) Serve() error {
if err := s.dynconfig.Serve(); err != nil {
logger.Fatalf("dynconfig start failed %s", err.Error())
}
logger.Info("dynconfig start successfully")
}()
@ -304,7 +305,7 @@ func (s *Server) Serve() error {
logger.Info("gc start successfully")
// Serve Job.
if s.config.Job.Enable && pkgredis.IsEnabled(s.config.Database.Redis.Addrs) {
if s.job != nil {
s.job.Serve()
logger.Info("job start successfully")
}
@ -317,6 +318,7 @@ func (s *Server) Serve() error {
if err == http.ErrServerClosed {
return
}
logger.Fatalf("metrics server closed unexpect: %s", err.Error())
}
}()
@ -328,6 +330,14 @@ func (s *Server) Serve() error {
logger.Info("announcer start successfully")
}()
// Serve network topology.
if s.networkTopology != nil {
go func() {
s.networkTopology.Serve()
logger.Info("network topology start successfully")
}()
}
// Generate GRPC limit listener.
ip, ok := ip.FormatIP(s.config.Server.ListenIP.String())
if !ok {
@ -423,6 +433,12 @@ func (s *Server) Stop() {
}
}
// Stop network topology.
if s.networkTopology != nil {
s.networkTopology.Stop()
logger.Info("network topology closed")
}
// Stop GRPC server.
stopped := make(chan struct{})
go func() {

View File

@ -26,7 +26,9 @@ import (
"time"
"go.opentelemetry.io/otel/trace"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
commonv1 "d7y.io/api/pkg/apis/common/v1"
commonv2 "d7y.io/api/pkg/apis/common/v2"
@ -43,6 +45,7 @@ import (
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
@ -64,6 +67,9 @@ type V1 struct {
// Storage interface.
storage storage.Storage
// Network topology interface.
networkTopology networktopology.NetworkTopology
}
// New v1 version of service instance.
@ -73,6 +79,7 @@ func NewV1(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networktopology networktopology.NetworkTopology,
) *V1 {
return &V1{
resource: resource,
@ -80,6 +87,7 @@ func NewV1(
config: cfg,
dynconfig: dynconfig,
storage: storage,
networkTopology: networktopology,
}
}
@ -647,6 +655,111 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e
return nil
}
// SyncProbes sync probes of the host.
func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
if v.networkTopology == nil {
return status.Errorf(codes.Unimplemented, "network topology is not enabled")
}
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
logger.Errorf("receive error: %s", err.Error())
return err
}
logger := logger.WithHost(req.Host.Id, req.Host.Hostname, req.Host.Ip)
switch syncProbesRequest := req.GetRequest().(type) {
case *schedulerv1.SyncProbesRequest_ProbeStartedRequest:
// Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id)
if err != nil {
logger.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
}
var probedHosts []*commonv1.Host
for _, probedHostID := range probedHostIDs {
probedHost, loaded := v.resource.HostManager().Load(probedHostID)
if !loaded {
logger.Warnf("probed host %s not found", probedHostID)
continue
}
probedHosts = append(probedHosts, &commonv1.Host{
Id: probedHost.ID,
Ip: probedHost.IP,
Hostname: probedHost.Hostname,
Port: probedHost.Port,
DownloadPort: probedHost.DownloadPort,
Location: probedHost.Network.Location,
Idc: probedHost.Network.IDC,
})
}
if len(probedHosts) == 0 {
logger.Error("probed host not found")
return status.Error(codes.NotFound, "probed host not found")
}
logger.Infof("probe started: %#v", probedHosts)
if err := stream.Send(&schedulerv1.SyncProbesResponse{
Hosts: probedHosts,
ProbeInterval: durationpb.New(v.config.NetworkTopology.Probe.Interval),
}); err != nil {
logger.Error(err)
return err
}
case *schedulerv1.SyncProbesRequest_ProbeFinishedRequest:
// Store probes in network topology. First create the association between
// source host and destination host, and then store the value of probe.
logger.Info("receive SyncProbesRequest_ProbeFinishedRequest")
for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes {
probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id)
if !loaded {
logger.Errorf("host %s not found", probe.Host.Id)
continue
}
if err := v.networkTopology.Store(req.Host.Id, probedHost.ID); err != nil {
logger.Errorf("store failed: %s", err.Error())
continue
}
if err := v.networkTopology.Probes(req.Host.Id, probe.Host.Id).Enqueue(&networktopology.Probe{
Host: probedHost,
RTT: probe.Rtt.AsDuration(),
CreatedAt: probe.CreatedAt.AsTime(),
}); err != nil {
logger.Errorf("enqueue failed: %s", err.Error())
continue
}
logger.Infof("probe finished: %#v", probe)
}
case *schedulerv1.SyncProbesRequest_ProbeFailedRequest:
// Log failed probes.
logger.Info("receive SyncProbesRequest_ProbeFailedRequest")
var failedProbedHostIDs []string
for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes {
failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id)
}
logger.Warnf("probe failed: %#v", failedProbedHostIDs)
default:
msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest)
logger.Error(msg)
return status.Error(codes.FailedPrecondition, msg)
}
}
}
// triggerTask triggers the first download of the task.
func (v *V1) triggerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, task *resource.Task, host *resource.Host, peer *resource.Peer, dynconfig config.DynconfigInterface) error {
// If task has available peer, peer does not need to be triggered.

View File

@ -36,6 +36,8 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
commonv1 "d7y.io/api/pkg/apis/common/v1"
commonv2 "d7y.io/api/pkg/apis/common/v2"
@ -43,6 +45,7 @@ import (
managerv2 "d7y.io/api/pkg/apis/manager/v2"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks"
schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2"
"d7y.io/dragonfly/v2/internal/dferrors"
"d7y.io/dragonfly/v2/manager/types"
@ -54,6 +57,8 @@ import (
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
"d7y.io/dragonfly/v2/scheduler/networktopology"
networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
@ -69,6 +74,16 @@ var (
BackToSourceCount: int(mockTaskBackToSourceLimit),
}
mockNetworkTopologyConfig = config.NetworkTopologyConfig{
Enable: true,
CollectInterval: 2 * time.Hour,
Probe: config.ProbeConfig{
QueueLength: 5,
Interval: 15 * time.Minute,
Count: 10,
},
}
mockRawHost = resource.Host{
ID: mockHostID,
Type: pkgtypes.HostTypeNormal,
@ -205,6 +220,86 @@ var (
Cost: 1 * time.Minute,
CreatedAt: time.Now(),
}
mockV1Probe = &schedulerv1.Probe{
Host: &commonv1.Host{
Id: mockRawHost.ID,
Ip: mockRawHost.IP,
Hostname: mockRawHost.Hostname,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Location: mockRawHost.Network.Location,
Idc: mockRawHost.Network.IDC,
},
Rtt: durationpb.New(30 * time.Millisecond),
CreatedAt: timestamppb.Now(),
}
mockV2Probe = &schedulerv2.Probe{
Host: &commonv2.Host{
Id: mockHostID,
Type: uint32(pkgtypes.HostTypeNormal),
Hostname: "foo",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: &commonv2.CPU{
LogicalCount: mockCPU.LogicalCount,
PhysicalCount: mockCPU.PhysicalCount,
Percent: mockCPU.Percent,
ProcessPercent: mockCPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: mockCPU.Times.User,
System: mockCPU.Times.System,
Idle: mockCPU.Times.Idle,
Nice: mockCPU.Times.Nice,
Iowait: mockCPU.Times.Iowait,
Irq: mockCPU.Times.Irq,
Softirq: mockCPU.Times.Softirq,
Steal: mockCPU.Times.Steal,
Guest: mockCPU.Times.Guest,
GuestNice: mockCPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: mockMemory.Total,
Available: mockMemory.Available,
Used: mockMemory.Used,
UsedPercent: mockMemory.UsedPercent,
ProcessUsedPercent: mockMemory.ProcessUsedPercent,
Free: mockMemory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: mockNetwork.TCPConnectionCount,
UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount,
Location: mockNetwork.Location,
Idc: mockNetwork.IDC,
},
Disk: &commonv2.Disk{
Total: mockDisk.Total,
Free: mockDisk.Free,
Used: mockDisk.Used,
UsedPercent: mockDisk.UsedPercent,
InodesTotal: mockDisk.InodesTotal,
InodesUsed: mockDisk.InodesUsed,
InodesFree: mockDisk.InodesFree,
InodesUsedPercent: mockDisk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: mockBuild.GitVersion,
GitCommit: mockBuild.GitCommit,
GoVersion: mockBuild.GoVersion,
Platform: mockBuild.Platform,
},
},
Rtt: durationpb.New(30 * time.Millisecond),
CreatedAt: timestamppb.Now(),
}
)
func TestService_NewV1(t *testing.T) {
@ -229,7 +324,9 @@ func TestService_NewV1(t *testing.T) {
resource := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage))
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage, networkTopology))
})
}
}
@ -891,10 +988,11 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -1156,9 +1254,10 @@ func TestServiceV1_ReportPieceResult(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
stream := schedulerv1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -1339,8 +1438,9 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -1403,8 +1503,9 @@ func TestServiceV1_StatTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
taskManager := resource.NewMockTaskManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT())
@ -1697,10 +1798,11 @@ func TestServiceV1_AnnounceTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -1897,13 +1999,14 @@ func TestServiceV1_LeaveTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT())
tc.expect(t, peer, svc.LeaveTask(context.Background(), &schedulerv1.PeerTarget{}))
@ -2327,11 +2430,12 @@ func TestServiceV1_AnnounceHost(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT())
})
@ -2539,13 +2643,14 @@ func TestServiceV1_LeaveHost(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT())
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{
@ -2555,6 +2660,425 @@ func TestServiceV1_LeaveHost(t *testing.T) {
}
}
func TestServiceV1_SyncProbes(t *testing.T) {
tests := []struct {
name string
mock func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder)
expect func(t *testing.T, err error)
}{
{
name: "network topology is not enabled",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
svc.networkTopology = nil
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = Unimplemented desc = network topology is not enabled")
},
},
{
name: "synchronize probes when receive ProbeStartedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{
Hosts: []*commonv1.Host{
{
Id: mockRawHost.ID,
Ip: mockRawHost.IP,
Hostname: mockRawHost.Hostname,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Location: mockRawHost.Network.Location,
Idc: mockRawHost.Network.IDC,
},
},
ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval),
})).Return(nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "synchronize probes when receive ProbeFinishedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
Probes: []*schedulerv1.Probe{mockV1Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
mp.Enqueue(gomock.Eq(&networktopology.Probe{
Host: &mockRawHost,
RTT: mockV1Probe.Rtt.AsDuration(),
CreatedAt: mockV1Probe.CreatedAt.AsTime(),
})).Return(nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "synchronize probes when receive ProbeFailedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
Probes: []*schedulerv1.FailedProbe{
{
Host: &commonv1.Host{
Id: mockRawHost.ID,
Ip: mockRawHost.IP,
Hostname: mockRawHost.Hostname,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Location: mockRawHost.Network.Location,
Idc: mockRawHost.Network.IDC,
},
Description: "foo",
},
},
},
},
}, nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "synchronize probes when receive fail type request",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: nil,
}, nil).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = FailedPrecondition desc = receive unknow request: <nil>")
},
},
{
name: "receive error",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
ms.Recv().Return(nil, errors.New("receive error")).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "receive error")
},
},
{
name: "receive end of file",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
ms.Recv().Return(nil, io.EOF).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "find probed host ids error",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error")
},
},
{
name: "load host error when receive ProbeStartedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found")
},
},
{
name: "send synchronize probes response error",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{
Hosts: []*commonv1.Host{
{
Id: mockRawHost.ID,
Ip: mockRawHost.IP,
Hostname: mockRawHost.Hostname,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Location: mockRawHost.Network.Location,
Idc: mockRawHost.Network.IDC,
},
},
ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval),
})).Return(errors.New("send synchronize probes response error")).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "send synchronize probes response error")
},
},
{
name: "load host error when receive ProbeFinishedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
Probes: []*schedulerv1.Probe{mockV1Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "store error when receive ProbeFinishedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
Probes: []*schedulerv1.Probe{mockV1Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(errors.New("store error")).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "enqueue probe error when receive ProbeFinishedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
Probes: []*schedulerv1.Probe{mockV1Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
mp.Enqueue(gomock.Any()).Return(errors.New("enqueue probe error")).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
probes := networktopologymocks.NewMockProbes(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
stream := schedulerv1mocks.NewMockScheduler_SyncProbesServer(ctl)
svc := NewV1(&config.Config{NetworkTopology: mockNetworkTopologyConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(svc, res.EXPECT(), probes, probes.EXPECT(), networkTopology.EXPECT(), hostManager, hostManager.EXPECT(), stream.EXPECT())
tc.expect(t, svc.SyncProbes(stream))
})
}
}
func TestServiceV1_triggerTask(t *testing.T) {
tests := []struct {
name string
@ -3005,7 +3529,8 @@ func TestServiceV1_triggerTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage, networkTopology)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -3109,7 +3634,8 @@ func TestServiceV1_storeTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
taskManager := resource.NewMockTaskManager(ctl)
tc.run(t, svc, taskManager, res.EXPECT(), taskManager.EXPECT())
})
@ -3187,7 +3713,8 @@ func TestServiceV1_storeHost(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
hostManager := resource.NewMockHostManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -3271,7 +3798,8 @@ func TestServiceV1_storePeer(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
peerManager := resource.NewMockPeerManager(ctl)
tc.run(t, svc, peerManager, res.EXPECT(), peerManager.EXPECT())
@ -3330,13 +3858,14 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
seedPeer := resource.NewMockSeedPeer(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, task, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(task, peer, seedPeer, res.EXPECT(), seedPeer.EXPECT())
svc.triggerSeedPeerTask(context.Background(), &mockPeerRange, task)
@ -3412,12 +3941,13 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(peer, scheduling.EXPECT())
svc.handleBeginOfPiece(context.Background(), peer)
@ -3551,8 +4081,9 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(tc.peer, peerManager, res.EXPECT(), peerManager.EXPECT())
svc.handlePieceSuccess(context.Background(), tc.peer, tc.piece)
@ -3740,6 +4271,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -3748,7 +4280,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
seedPeer := resource.NewMockSeedPeer(ctl)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, parent, tc.piece, peerManager, seedPeer, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT(), seedPeer.EXPECT())
})
@ -3848,6 +4380,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
url, err := url.Parse(s.URL)
if err != nil {
@ -3871,7 +4404,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(peer)
svc.handlePeerSuccess(context.Background(), peer)
@ -3946,7 +4479,8 @@ func TestServiceV1_handlePeerFail(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -4032,7 +4566,8 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
tc.mock(task)
@ -4171,7 +4706,8 @@ func TestServiceV1_handleTaskFail(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
tc.mock(task)

View File

@ -37,6 +37,7 @@ import (
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics"
"d7y.io/dragonfly/v2/scheduler/networktopology"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
@ -58,6 +59,9 @@ type V2 struct {
// Storage interface.
storage storage.Storage
// Network topology interface.
networkTopology networktopology.NetworkTopology
}
// New v2 version of service instance.
@ -67,6 +71,7 @@ func NewV2(
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
networkTopology networktopology.NetworkTopology,
) *V2 {
return &V2{
resource: resource,
@ -74,6 +79,7 @@ func NewV2(
config: cfg,
dynconfig: dynconfig,
storage: storage,
networkTopology: networkTopology,
}
}
@ -637,6 +643,163 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e
return nil
}
// SyncProbes sync probes of the host.
func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
if v.networkTopology == nil {
return status.Errorf(codes.Unimplemented, "network topology is not enabled")
}
for {
req, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
logger.Errorf("receive error: %s", err.Error())
return err
}
logger := logger.WithHost(req.Host.Id, req.Host.Hostname, req.Host.Ip)
switch syncProbesRequest := req.GetRequest().(type) {
case *schedulerv2.SyncProbesRequest_ProbeStartedRequest:
// Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id)
if err != nil {
logger.Error(err)
return status.Error(codes.FailedPrecondition, err.Error())
}
var probedHosts []*commonv2.Host
for _, probedHostID := range probedHostIDs {
probedHost, loaded := v.resource.HostManager().Load(probedHostID)
if !loaded {
logger.Warnf("probed host %s not found", probedHostID)
continue
}
probedHosts = append(probedHosts, &commonv2.Host{
Id: probedHost.ID,
Type: uint32(probedHost.Type),
Hostname: probedHost.Hostname,
Ip: probedHost.IP,
Port: probedHost.Port,
DownloadPort: probedHost.DownloadPort,
Os: probedHost.OS,
Platform: probedHost.Platform,
PlatformFamily: probedHost.PlatformFamily,
PlatformVersion: probedHost.PlatformVersion,
KernelVersion: probedHost.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: probedHost.CPU.LogicalCount,
PhysicalCount: probedHost.CPU.PhysicalCount,
Percent: probedHost.CPU.Percent,
ProcessPercent: probedHost.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: probedHost.CPU.Times.User,
System: probedHost.CPU.Times.System,
Idle: probedHost.CPU.Times.Idle,
Nice: probedHost.CPU.Times.Nice,
Iowait: probedHost.CPU.Times.Iowait,
Irq: probedHost.CPU.Times.Irq,
Softirq: probedHost.CPU.Times.Softirq,
Steal: probedHost.CPU.Times.Steal,
Guest: probedHost.CPU.Times.Guest,
GuestNice: probedHost.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: probedHost.Memory.Total,
Available: probedHost.Memory.Available,
Used: probedHost.Memory.Used,
UsedPercent: probedHost.Memory.UsedPercent,
ProcessUsedPercent: probedHost.Memory.ProcessUsedPercent,
Free: probedHost.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: probedHost.Network.TCPConnectionCount,
UploadTcpConnectionCount: probedHost.Network.UploadTCPConnectionCount,
Location: probedHost.Network.Location,
Idc: probedHost.Network.IDC,
},
Disk: &commonv2.Disk{
Total: probedHost.Disk.Total,
Free: probedHost.Disk.Free,
Used: probedHost.Disk.Used,
UsedPercent: probedHost.Disk.UsedPercent,
InodesTotal: probedHost.Disk.InodesTotal,
InodesUsed: probedHost.Disk.InodesUsed,
InodesFree: probedHost.Disk.InodesFree,
InodesUsedPercent: probedHost.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: probedHost.Build.GitVersion,
GitCommit: probedHost.Build.GitCommit,
GoVersion: probedHost.Build.GoVersion,
Platform: probedHost.Build.Platform,
},
})
}
if len(probedHosts) == 0 {
logger.Error("probed host not found")
return status.Error(codes.NotFound, "probed host not found")
}
logger.Infof("probe started: %#v", probedHosts)
if err := stream.Send(&schedulerv2.SyncProbesResponse{
Hosts: probedHosts,
ProbeInterval: durationpb.New(v.config.NetworkTopology.Probe.Interval),
}); err != nil {
logger.Error(err)
return err
}
case *schedulerv2.SyncProbesRequest_ProbeFinishedRequest:
// Store probes in network topology. First create the association between
// source host and destination host, and then store the value of probe.
logger.Info("receive SyncProbesRequest_ProbeFinishedRequest")
for _, probe := range syncProbesRequest.ProbeFinishedRequest.Probes {
probedHost, loaded := v.resource.HostManager().Load(probe.Host.Id)
if !loaded {
logger.Errorf("host %s not found", probe.Host.Id)
continue
}
if err := v.networkTopology.Store(req.Host.Id, probedHost.ID); err != nil {
logger.Errorf("store failed: %s", err.Error())
continue
}
if err := v.networkTopology.Probes(req.Host.Id, probe.Host.Id).Enqueue(&networktopology.Probe{
Host: probedHost,
RTT: probe.Rtt.AsDuration(),
CreatedAt: probe.CreatedAt.AsTime(),
}); err != nil {
logger.Errorf("enqueue failed: %s", err.Error())
continue
}
logger.Infof("probe finished: %#v", probe)
}
case *schedulerv2.SyncProbesRequest_ProbeFailedRequest:
// Log failed probes.
logger.Info("receive SyncProbesRequest_ProbeFailedRequest")
var failedProbedHostIDs []string
for _, failedProbe := range syncProbesRequest.ProbeFailedRequest.Probes {
failedProbedHostIDs = append(failedProbedHostIDs, failedProbe.Host.Id)
}
logger.Warnf("probe failed: %#v", failedProbedHostIDs)
default:
msg := fmt.Sprintf("receive unknow request: %#v", syncProbesRequest)
logger.Error(msg)
return status.Error(codes.FailedPrecondition, msg)
}
}
}
// handleRegisterPeerRequest handles RegisterPeerRequest of AnnouncePeerRequest.
func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, req *schedulerv2.RegisterPeerRequest) error {
// Handle resource included host, task, and peer.

View File

@ -19,6 +19,7 @@ package service
import (
"context"
"errors"
"io"
"net"
"net/http"
"net/http/httptest"
@ -46,7 +47,10 @@ import (
pkgtypes "d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
"d7y.io/dragonfly/v2/scheduler/networktopology"
networktopologymocks "d7y.io/dragonfly/v2/scheduler/networktopology/mocks"
"d7y.io/dragonfly/v2/scheduler/resource"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
@ -73,7 +77,9 @@ func TestService_NewV2(t *testing.T) {
resource := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage))
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage, networkTopology))
})
}
}
@ -237,13 +243,14 @@ func TestServiceV2_StatPeer(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT())
resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})
@ -308,13 +315,14 @@ func TestServiceV2_LeavePeer(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT())
tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}))
@ -394,9 +402,10 @@ func TestServiceV2_StatTask(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
taskManager := resource.NewMockTaskManager(ctl)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(task, taskManager, res.EXPECT(), taskManager.EXPECT())
resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{Id: mockTaskID})
@ -829,11 +838,12 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT())
})
@ -898,13 +908,14 @@ func TestServiceV2_LeaveHost(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT())
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID}))
@ -912,6 +923,542 @@ func TestServiceV2_LeaveHost(t *testing.T) {
}
}
func TestServiceV2_SyncProbes(t *testing.T) {
tests := []struct {
name string
mock func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder)
expect func(t *testing.T, err error)
}{
{
name: "network topology is not enabled",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
svc.networkTopology = nil
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = Unimplemented desc = network topology is not enabled")
},
},
{
name: "synchronize probes when receive ProbeStartedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
Hosts: []*commonv2.Host{
{
Id: mockRawHost.ID,
Type: uint32(mockRawHost.Type),
Hostname: mockRawHost.Hostname,
Ip: mockRawHost.IP,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Os: mockRawHost.OS,
Platform: mockRawHost.Platform,
PlatformFamily: mockRawHost.PlatformFamily,
PlatformVersion: mockRawHost.PlatformVersion,
KernelVersion: mockRawHost.KernelVersion,
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
},
ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval),
})).Return(nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "synchronize probes when receive ProbeFinishedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
Probes: []*schedulerv2.Probe{mockV2Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
mp.Enqueue(gomock.Eq(&networktopology.Probe{
Host: &mockRawHost,
RTT: mockV2Probe.Rtt.AsDuration(),
CreatedAt: mockV2Probe.CreatedAt.AsTime(),
})).Return(nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "synchronize probes when receive ProbeFailedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeFailedRequest{
ProbeFailedRequest: &schedulerv2.ProbeFailedRequest{
Probes: []*schedulerv2.FailedProbe{
{
Host: &commonv2.Host{
Id: mockRawHost.ID,
Type: uint32(mockRawHost.Type),
Hostname: mockRawHost.Hostname,
Ip: mockRawHost.IP,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Os: mockRawHost.OS,
Platform: mockRawHost.Platform,
PlatformFamily: mockRawHost.PlatformFamily,
PlatformVersion: mockRawHost.PlatformVersion,
KernelVersion: mockRawHost.KernelVersion,
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Description: "foo",
},
},
},
},
}, nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "synchronize probes when receive fail type request",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: nil,
}, nil).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = FailedPrecondition desc = receive unknow request: <nil>")
},
},
{
name: "receive error",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
ms.Recv().Return(nil, errors.New("receive error")).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "receive error")
},
},
{
name: "receive end of file",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
ms.Recv().Return(nil, io.EOF).Times(1)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "find probed host ids error",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error")
},
},
{
name: "load host error when receive ProbeStartedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found")
},
},
{
name: "send synchronize probes response error",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
Hosts: []*commonv2.Host{
{
Id: mockRawHost.ID,
Type: uint32(mockRawHost.Type),
Hostname: mockRawHost.Hostname,
Ip: mockRawHost.IP,
Port: mockRawHost.Port,
DownloadPort: mockRawHost.DownloadPort,
Os: mockRawHost.OS,
Platform: mockRawHost.Platform,
PlatformFamily: mockRawHost.PlatformFamily,
PlatformVersion: mockRawHost.PlatformVersion,
KernelVersion: mockRawHost.KernelVersion,
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
},
ProbeInterval: durationpb.New(mockNetworkTopologyConfig.Probe.Interval),
})).Return(errors.New("send synchronize probes response error")).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "send synchronize probes response error")
},
},
{
name: "load host error when receive ProbeFinishedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
Probes: []*schedulerv2.Probe{mockV2Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "store error when receive ProbeFinishedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
Probes: []*schedulerv2.Probe{mockV2Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(errors.New("store error")).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
{
name: "enqueue probe error when receive ProbeFinishedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv2.ProbeFinishedRequest{
Probes: []*schedulerv2.Probe{mockV2Probe},
},
},
}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
mn.Store(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(nil).Times(1),
mn.Probes(gomock.Eq(mockRawSeedHost.ID), gomock.Eq(mockRawHost.ID)).Return(probes).Times(1),
mp.Enqueue(gomock.Any()).Return(errors.New("enqueue probe error")).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.NoError(err)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
probes := networktopologymocks.NewMockProbes(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
stream := schedulerv2mocks.NewMockScheduler_SyncProbesServer(ctl)
svc := NewV2(&config.Config{NetworkTopology: mockNetworkTopologyConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
tc.mock(svc, res.EXPECT(), probes, probes.EXPECT(), networkTopology.EXPECT(), hostManager, hostManager.EXPECT(), stream.EXPECT())
tc.expect(t, svc.SyncProbes(stream))
})
}
}
func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
tests := []struct {
name string
@ -1401,6 +1948,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
@ -1412,7 +1960,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT())
})
@ -1910,6 +2458,7 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
@ -1921,7 +2470,7 @@ func TestServiceV2_handleRegisterSeedPeerRequest(t *testing.T) {
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT())
})
@ -2006,6 +2555,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2013,7 +2563,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2098,6 +2648,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2105,7 +2656,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2171,6 +2722,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2178,7 +2730,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2432,6 +2984,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
url, err := url.Parse(s.URL)
@ -2457,7 +3010,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2522,6 +3075,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2529,7 +3083,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2641,6 +3195,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2648,7 +3203,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2799,6 +3354,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2806,7 +3362,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
})
@ -2924,6 +3480,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -2931,7 +3488,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T)
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
})
@ -3066,6 +3623,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -3073,7 +3631,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
})
@ -3129,6 +3687,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
@ -3136,7 +3695,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.req, peer, peerManager, res.EXPECT(), peerManager.EXPECT())
})
@ -3336,6 +3895,7 @@ func TestServiceV2_handleResource(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
hostManager := resource.NewMockHostManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
@ -3346,7 +3906,7 @@ func TestServiceV2_handleResource(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, res.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
})
@ -3615,6 +4175,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
networkTopology := networktopologymocks.NewMockNetworkTopology(ctl)
seedPeerClient := resource.NewMockSeedPeer(ctl)
mockHost := resource.NewHost(
@ -3622,7 +4183,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&tc.config, res, scheduling, dynconfig, storage)
svc := NewV2(&tc.config, res, scheduling, dynconfig, storage, networkTopology)
tc.run(t, svc, peer, seedPeerClient, res.EXPECT(), seedPeerClient.EXPECT())
})