From 670c89289819ae1ba6f767e87de4da8ab3f974df Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 28 Feb 2023 15:48:06 +0800 Subject: [PATCH] feat: implement v2 version of scheduler service (#2125) Implement StatTask, AnnounceHost and LeaveHost api in scheduler service v2. Signed-off-by: Gaius --- client/daemon/peer/peertask_manager_mock.go | 22 +- client/daemon/peer/piece_manager_mock.go | 8 +- pkg/types/types.go | 4 +- scheduler/metrics/metrics.go | 19 +- scheduler/resource/seed_peer_mock.go | 6 +- scheduler/rpcserver/scheduler_server_v1.go | 13 +- scheduler/rpcserver/scheduler_server_v2.go | 45 +- scheduler/service/service_v1.go | 9 +- scheduler/service/service_v1_test.go | 431 ++++++++++++++ scheduler/service/service_v2.go | 258 +++++++- scheduler/service/service_v2_test.go | 617 +++++++++++++++++++- 11 files changed, 1373 insertions(+), 59 deletions(-) diff --git a/client/daemon/peer/peertask_manager_mock.go b/client/daemon/peer/peertask_manager_mock.go index d727fccc0..382a6bae1 100644 --- a/client/daemon/peer/peertask_manager_mock.go +++ b/client/daemon/peer/peertask_manager_mock.go @@ -9,10 +9,10 @@ import ( io "io" reflect "reflect" - common "d7y.io/api/pkg/apis/common/v1" - scheduler "d7y.io/api/pkg/apis/scheduler/v1" + v1 "d7y.io/api/pkg/apis/common/v1" + v10 "d7y.io/api/pkg/apis/scheduler/v1" storage "d7y.io/dragonfly/v2/client/daemon/storage" - logger "d7y.io/dragonfly/v2/internal/dflog" + dflog "d7y.io/dragonfly/v2/internal/dflog" gomock "github.com/golang/mock/gomock" status "google.golang.org/grpc/status" ) @@ -41,7 +41,7 @@ func (m *MockTaskManager) EXPECT() *MockTaskManagerMockRecorder { } // AnnouncePeerTask mocks base method. -func (m *MockTaskManager) AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType common.TaskType, urlMeta *common.UrlMeta) error { +func (m *MockTaskManager) AnnouncePeerTask(ctx context.Context, meta storage.PeerTaskMetadata, url string, taskType v1.TaskType, urlMeta *v1.UrlMeta) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "AnnouncePeerTask", ctx, meta, url, taskType, urlMeta) ret0, _ := ret[0].(error) @@ -131,10 +131,10 @@ func (mr *MockTaskManagerMockRecorder) StartStreamTask(ctx, req interface{}) *go } // StatTask mocks base method. -func (m *MockTaskManager) StatTask(ctx context.Context, taskID string) (*scheduler.Task, error) { +func (m *MockTaskManager) StatTask(ctx context.Context, taskID string) (*v10.Task, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StatTask", ctx, taskID) - ret0, _ := ret[0].(*scheduler.Task) + ret0, _ := ret[0].(*v10.Task) ret1, _ := ret[1].(error) return ret0, ret1 } @@ -160,7 +160,7 @@ func (mr *MockTaskManagerMockRecorder) Stop(ctx interface{}) *gomock.Call { } // Subscribe mocks base method. -func (m *MockTaskManager) Subscribe(request *common.PieceTaskRequest) (*SubscribeResponse, bool) { +func (m *MockTaskManager) Subscribe(request *v1.PieceTaskRequest) (*SubscribeResponse, bool) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Subscribe", request) ret0, _ := ret[0].(*SubscribeResponse) @@ -322,10 +322,10 @@ func (mr *MockTaskMockRecorder) GetTraffic() *gomock.Call { } // Log mocks base method. -func (m *MockTask) Log() *logger.SugaredLoggerOnWith { +func (m *MockTask) Log() *dflog.SugaredLoggerOnWith { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Log") - ret0, _ := ret[0].(*logger.SugaredLoggerOnWith) + ret0, _ := ret[0].(*dflog.SugaredLoggerOnWith) return ret0 } @@ -431,10 +431,10 @@ func (m *MockLogger) EXPECT() *MockLoggerMockRecorder { } // Log mocks base method. -func (m *MockLogger) Log() *logger.SugaredLoggerOnWith { +func (m *MockLogger) Log() *dflog.SugaredLoggerOnWith { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "Log") - ret0, _ := ret[0].(*logger.SugaredLoggerOnWith) + ret0, _ := ret[0].(*dflog.SugaredLoggerOnWith) return ret0 } diff --git a/client/daemon/peer/piece_manager_mock.go b/client/daemon/peer/piece_manager_mock.go index fb0469564..1411507fe 100644 --- a/client/daemon/peer/piece_manager_mock.go +++ b/client/daemon/peer/piece_manager_mock.go @@ -9,8 +9,8 @@ import ( io "io" reflect "reflect" - dfdaemon "d7y.io/api/pkg/apis/dfdaemon/v1" - scheduler "d7y.io/api/pkg/apis/scheduler/v1" + v1 "d7y.io/api/pkg/apis/dfdaemon/v1" + v10 "d7y.io/api/pkg/apis/scheduler/v1" storage "d7y.io/dragonfly/v2/client/daemon/storage" http "d7y.io/dragonfly/v2/pkg/net/http" gomock "github.com/golang/mock/gomock" @@ -55,7 +55,7 @@ func (mr *MockPieceManagerMockRecorder) DownloadPiece(ctx, request interface{}) } // DownloadSource mocks base method. -func (m *MockPieceManager) DownloadSource(ctx context.Context, pt Task, request *scheduler.PeerTaskRequest, parsedRange *http.Range) error { +func (m *MockPieceManager) DownloadSource(ctx context.Context, pt Task, request *v10.PeerTaskRequest, parsedRange *http.Range) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "DownloadSource", ctx, pt, request, parsedRange) ret0, _ := ret[0].(error) @@ -83,7 +83,7 @@ func (mr *MockPieceManagerMockRecorder) Import(ctx, ptm, tsd, contentLength, rea } // ImportFile mocks base method. -func (m *MockPieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, req *dfdaemon.ImportTaskRequest) error { +func (m *MockPieceManager) ImportFile(ctx context.Context, ptm storage.PeerTaskMetadata, tsd storage.TaskStorageDriver, req *v1.ImportTaskRequest) error { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "ImportFile", ctx, ptm, tsd, req) ret0, _ := ret[0].(error) diff --git a/pkg/types/types.go b/pkg/types/types.go index 1fdcf96ac..91d3e200c 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -109,8 +109,8 @@ const ( ) // Name returns the name of host type. -func (h *HostType) Name() string { - switch *h { +func (h HostType) Name() string { + switch h { case HostTypeSuperSeed: return HostTypeSuperSeedName case HostTypeStrongSeed: diff --git a/scheduler/metrics/metrics.go b/scheduler/metrics/metrics.go index acb80741b..23503e679 100644 --- a/scheduler/metrics/metrics.go +++ b/scheduler/metrics/metrics.go @@ -88,6 +88,20 @@ var ( Help: "Counter of the number of failed of the leaving peer.", }) + ExchangePeerCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "exchange_peer_total", + Help: "Counter of the number of the leaving peer.", + }) + + ExchangePeerFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + Namespace: types.MetricsNamespace, + Subsystem: types.SchedulerMetricsName, + Name: "exchange_peer_failure_total", + Help: "Counter of the number of failed of the leaving peer.", + }) + RegisterTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, @@ -152,12 +166,13 @@ var ( }, []string{"os", "platform", "platform_family", "platform_version", "kernel_version", "git_version", "git_commit", "go_version", "build_platform"}) - AnnounceHostFailureCount = promauto.NewCounter(prometheus.CounterOpts{ + AnnounceHostFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, Subsystem: types.SchedulerMetricsName, Name: "announce_host_failure_total", Help: "Counter of the number of failed of the announce host.", - }) + }, []string{"os", "platform", "platform_family", "platform_version", + "kernel_version", "git_version", "git_commit", "go_version", "build_platform"}) LeaveTaskCount = promauto.NewCounterVec(prometheus.CounterOpts{ Namespace: types.MetricsNamespace, diff --git a/scheduler/resource/seed_peer_mock.go b/scheduler/resource/seed_peer_mock.go index cf7fba16d..85ecb4d50 100644 --- a/scheduler/resource/seed_peer_mock.go +++ b/scheduler/resource/seed_peer_mock.go @@ -8,7 +8,7 @@ import ( context "context" reflect "reflect" - scheduler "d7y.io/api/pkg/apis/scheduler/v1" + v1 "d7y.io/api/pkg/apis/scheduler/v1" http "d7y.io/dragonfly/v2/pkg/net/http" gomock "github.com/golang/mock/gomock" ) @@ -79,11 +79,11 @@ func (mr *MockSeedPeerMockRecorder) Stop() *gomock.Call { } // TriggerTask mocks base method. -func (m *MockSeedPeer) TriggerTask(arg0 context.Context, arg1 *http.Range, arg2 *Task) (*Peer, *scheduler.PeerResult, error) { +func (m *MockSeedPeer) TriggerTask(arg0 context.Context, arg1 *http.Range, arg2 *Task) (*Peer, *v1.PeerResult, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "TriggerTask", arg0, arg1, arg2) ret0, _ := ret[0].(*Peer) - ret1, _ := ret[1].(*scheduler.PeerResult) + ret1, _ := ret[1].(*v1.PeerResult) ret2, _ := ret[2].(error) return ret0, ret1, ret2 } diff --git a/scheduler/rpcserver/scheduler_server_v1.go b/scheduler/rpcserver/scheduler_server_v1.go index 3ed1133f7..a5477a1fc 100644 --- a/scheduler/rpcserver/scheduler_server_v1.go +++ b/scheduler/rpcserver/scheduler_server_v1.go @@ -86,7 +86,7 @@ func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.A metrics.AnnounceTaskCount.Inc() if err := s.service.AnnounceTask(ctx, req); err != nil { metrics.AnnounceTaskFailureCount.Inc() - return new(emptypb.Empty), err + return nil, err } return new(emptypb.Empty), nil @@ -95,13 +95,13 @@ func (s *schedulerServerV1) AnnounceTask(ctx context.Context, req *schedulerv1.A // StatTask checks if the given task exists. func (s *schedulerServerV1) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest) (*schedulerv1.Task, error) { metrics.StatTaskCount.Inc() - task, err := s.service.StatTask(ctx, req) + resp, err := s.service.StatTask(ctx, req) if err != nil { metrics.StatTaskFailureCount.Inc() return nil, err } - return task, nil + return resp, nil } // LeaveTask makes the peer unschedulable. @@ -114,8 +114,9 @@ func (s *schedulerServerV1) AnnounceHost(ctx context.Context, req *schedulerv1.A metrics.AnnounceHostCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion, req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc() if err := s.service.AnnounceHost(ctx, req); err != nil { - metrics.AnnounceHostFailureCount.Inc() - return new(emptypb.Empty), err + metrics.AnnounceHostFailureCount.WithLabelValues(req.Os, req.Platform, req.PlatformFamily, req.PlatformVersion, + req.KernelVersion, req.Build.GitVersion, req.Build.GitCommit, req.Build.GoVersion, req.Build.Platform).Inc() + return nil, err } return new(emptypb.Empty), nil @@ -126,7 +127,7 @@ func (s *schedulerServerV1) LeaveHost(ctx context.Context, req *schedulerv1.Leav metrics.LeaveHostCount.Inc() if err := s.service.LeaveHost(ctx, req); err != nil { metrics.LeaveHostFailureCount.Inc() - return new(emptypb.Empty), err + return nil, err } return new(emptypb.Empty), nil diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 81697470e..4a751ac0a 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -58,13 +58,13 @@ func (s *schedulerServerV2) AnnouncePeer(stream schedulerv2.Scheduler_AnnouncePe // StatPeer checks information of peer. func (s *schedulerServerV2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*commonv2.Peer, error) { metrics.StatPeerCount.Inc() - peer, err := s.service.StatPeer(ctx, req) + resp, err := s.service.StatPeer(ctx, req) if err != nil { metrics.StatPeerFailureCount.Inc() return nil, err } - return peer, nil + return resp, nil } // LeavePeer releases peer in scheduler. @@ -72,31 +72,58 @@ func (s *schedulerServerV2) LeavePeer(ctx context.Context, req *schedulerv2.Leav metrics.LeavePeerCount.Inc() if err := s.service.LeavePeer(ctx, req); err != nil { metrics.LeavePeerFailureCount.Inc() - return new(emptypb.Empty), err + return nil, err } return new(emptypb.Empty), nil } -// TODO exchange peer api definition. // ExchangePeer exchanges peer information. func (s *schedulerServerV2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequest) (*schedulerv2.ExchangePeerResponse, error) { - return nil, nil + metrics.ExchangePeerCount.Inc() + resp, err := s.service.ExchangePeer(ctx, req) + if err != nil { + metrics.ExchangePeerFailureCount.Inc() + return nil, err + } + + return resp, nil } -// Checks information of task. +// StatTask checks information of task. func (s *schedulerServerV2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) { - return nil, nil + metrics.StatTaskCount.Inc() + resp, err := s.service.StatTask(ctx, req) + if err != nil { + metrics.StatTaskFailureCount.Inc() + return nil, err + } + + return resp, nil } // AnnounceHost announces host to scheduler. func (s *schedulerServerV2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) (*emptypb.Empty, error) { - return nil, nil + metrics.AnnounceHostCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion, + req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc() + if err := s.service.AnnounceHost(ctx, req); err != nil { + metrics.AnnounceHostFailureCount.WithLabelValues(req.Host.Os, req.Host.Platform, req.Host.PlatformFamily, req.Host.PlatformVersion, + req.Host.KernelVersion, req.Host.Build.GitVersion, req.Host.Build.GitCommit, req.Host.Build.GoVersion, req.Host.Build.Platform).Inc() + return nil, err + } + + return new(emptypb.Empty), nil } // LeaveHost releases host in scheduler. func (s *schedulerServerV2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) (*emptypb.Empty, error) { - return nil, nil + metrics.LeaveHostCount.Inc() + if err := s.service.LeaveHost(ctx, req); err != nil { + metrics.LeaveHostFailureCount.Inc() + return nil, err + } + + return new(emptypb.Empty), nil } // SyncProbes sync probes of the host. diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 4cb8053ba..3d367f450 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -538,6 +538,7 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ return nil } + // Host already exists and updates properties. host.Port = req.Port host.DownloadPort = req.DownloadPort host.Type = types.ParseHostType(req.Type) @@ -548,6 +549,10 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ host.KernelVersion = req.KernelVersion host.UpdatedAt.Store(time.Now()) + if concurrentUploadLimit > 0 { + host.ConcurrentUploadLimit.Store(concurrentUploadLimit) + } + if req.Cpu != nil { host.CPU = resource.CPU{ LogicalCount: req.Cpu.LogicalCount, @@ -612,10 +617,6 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ } } - if concurrentUploadLimit > 0 { - host.ConcurrentUploadLimit.Store(concurrentUploadLimit) - } - return nil } diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index b05fec126..455be1758 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -1930,6 +1930,437 @@ func TestServiceV1_LeaveTask(t *testing.T) { } } +func TestServiceV1_AnnounceHost(t *testing.T) { + tests := []struct { + name string + req *schedulerv1.AnnounceHostRequest + run func(t *testing.T, svc *V1, req *schedulerv1.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) + }{ + { + name: "host not found", + req: &schedulerv1.AnnounceHostRequest{ + Id: mockHostID, + Type: pkgtypes.HostTypeNormal.Name(), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &schedulerv1.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &schedulerv1.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &schedulerv1.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &schedulerv1.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &schedulerv1.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &schedulerv1.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + run: func(t *testing.T, svc *V1, req *schedulerv1.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(nil, false).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Store(gomock.Any()).Do(func(host *resource.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Id) + assert.Equal(host.Type, pkgtypes.ParseHostType(req.Type)) + assert.Equal(host.Hostname, req.Hostname) + assert.Equal(host.IP, req.Ip) + assert.Equal(host.Port, req.Port) + assert.Equal(host.DownloadPort, req.DownloadPort) + assert.Equal(host.OS, req.Os) + assert.Equal(host.Platform, req.Platform) + assert.Equal(host.PlatformVersion, req.PlatformVersion) + assert.Equal(host.KernelVersion, req.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return().Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + }, + }, + { + name: "host not found and dynconfig returns error", + req: &schedulerv1.AnnounceHostRequest{ + Id: mockHostID, + Type: pkgtypes.HostTypeNormal.Name(), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &schedulerv1.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &schedulerv1.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &schedulerv1.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &schedulerv1.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &schedulerv1.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &schedulerv1.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + run: func(t *testing.T, svc *V1, req *schedulerv1.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(nil, false).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Store(gomock.Any()).Do(func(host *resource.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Id) + assert.Equal(host.Type, pkgtypes.ParseHostType(req.Type)) + assert.Equal(host.Hostname, req.Hostname) + assert.Equal(host.IP, req.Ip) + assert.Equal(host.Port, req.Port) + assert.Equal(host.DownloadPort, req.DownloadPort) + assert.Equal(host.OS, req.Os) + assert.Equal(host.Platform, req.Platform) + assert.Equal(host.PlatformVersion, req.PlatformVersion) + assert.Equal(host.KernelVersion, req.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(50)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return().Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + }, + }, + { + name: "host already exists", + req: &schedulerv1.AnnounceHostRequest{ + Id: mockHostID, + Type: pkgtypes.HostTypeNormal.Name(), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &schedulerv1.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &schedulerv1.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &schedulerv1.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &schedulerv1.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &schedulerv1.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &schedulerv1.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + run: func(t *testing.T, svc *V1, req *schedulerv1.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + assert.Equal(host.ID, req.Id) + assert.Equal(host.Type, pkgtypes.ParseHostType(req.Type)) + assert.Equal(host.Hostname, req.Hostname) + assert.Equal(host.IP, req.Ip) + assert.Equal(host.Port, req.Port) + assert.Equal(host.DownloadPort, req.DownloadPort) + assert.Equal(host.OS, req.Os) + assert.Equal(host.Platform, req.Platform) + assert.Equal(host.PlatformVersion, req.PlatformVersion) + assert.Equal(host.KernelVersion, req.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }, + }, + { + name: "host already exists and dynconfig returns error", + req: &schedulerv1.AnnounceHostRequest{ + Id: mockHostID, + Type: pkgtypes.HostTypeNormal.Name(), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &schedulerv1.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &schedulerv1.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &schedulerv1.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &schedulerv1.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &schedulerv1.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &schedulerv1.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + run: func(t *testing.T, svc *V1, req *schedulerv1.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + assert.Equal(host.ID, req.Id) + assert.Equal(host.Type, pkgtypes.ParseHostType(req.Type)) + assert.Equal(host.Hostname, req.Hostname) + assert.Equal(host.IP, req.Ip) + assert.Equal(host.Port, req.Port) + assert.Equal(host.DownloadPort, req.DownloadPort) + assert.Equal(host.OS, req.Os) + assert.Equal(host.Platform, req.Platform) + assert.Equal(host.PlatformVersion, req.PlatformVersion) + assert.Equal(host.KernelVersion, req.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(50)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) + }) + } +} + func TestServiceV1_LeaveHost(t *testing.T) { tests := []struct { name string diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index f8c7585aa..2336bc294 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -19,6 +19,7 @@ package service import ( "context" "fmt" + "time" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -29,8 +30,8 @@ import ( schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" - "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/storage" @@ -149,12 +150,12 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c UpdatedAt: timestamppb.New(peer.Task.UpdatedAt.Load()), } - // Set digest to response. + // Set digest to task response. if peer.Task.Digest != nil { resp.Task.Digest = peer.Task.Digest.String() } - // Set pieces to response. + // Set pieces to task response. peer.Task.Pieces.Range(func(key, value any) bool { piece, ok := value.(*resource.Piece) if !ok { @@ -253,15 +254,12 @@ func (v *V2) LeavePeer(ctx context.Context, req *schedulerv2.LeavePeerRequest) e peer, loaded := v.resource.PeerManager().Load(req.PeerId) if !loaded { - metrics.LeavePeerFailureCount.Inc() msg := fmt.Sprintf("peer %s not found", req.PeerId) logger.Error(msg) return status.Error(codes.NotFound, msg) } - metrics.LeavePeerCount.Inc() if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil { - metrics.LeavePeerFailureCount.Inc() msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) peer.Log.Error(msg) return status.Error(codes.FailedPrecondition, msg) @@ -276,17 +274,261 @@ func (v *V2) ExchangePeer(ctx context.Context, req *schedulerv2.ExchangePeerRequ return nil, nil } -// Checks information of task. +// StatTask checks information of task. func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*commonv2.Task, error) { - return nil, nil + logger.WithTaskID(req.Id).Infof("stat task request: %#v", req) + + task, loaded := v.resource.TaskManager().Load(req.Id) + if !loaded { + msg := fmt.Sprintf("task %s not found", req.Id) + logger.Error(msg) + return nil, status.Error(codes.NotFound, msg) + } + + resp := &commonv2.Task{ + Id: task.ID, + Type: task.Type, + Url: task.URL, + Tag: task.Tag, + Application: task.Application, + Filters: task.Filters, + Header: task.Header, + PieceLength: task.PieceLength, + ContentLength: task.ContentLength.Load(), + PieceCount: task.TotalPieceCount.Load(), + SizeScope: task.SizeScope(), + State: task.FSM.Current(), + PeerCount: int32(task.PeerCount()), + CreatedAt: timestamppb.New(task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(task.UpdatedAt.Load()), + } + + // Set digest to response. + if task.Digest != nil { + resp.Digest = task.Digest.String() + } + + // Set pieces to response. + task.Pieces.Range(func(key, value any) bool { + piece, ok := value.(*resource.Piece) + if !ok { + task.Log.Errorf("invalid piece %s %#v", key, value) + return true + } + + respPiece := &commonv2.Piece{ + Number: piece.Number, + ParentId: piece.ParentID, + Offset: piece.Offset, + Length: piece.Length, + TrafficType: piece.TrafficType, + Cost: durationpb.New(piece.Cost), + CreatedAt: timestamppb.New(piece.CreatedAt), + } + + if piece.Digest != nil { + respPiece.Digest = piece.Digest.String() + } + + resp.Pieces = append(resp.Pieces, respPiece) + return true + }) + + return resp, nil } // AnnounceHost announces host to scheduler. func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequest) error { + logger.WithHostID(req.Host.Id).Infof("announce host request: %#v", req.Host) + + // Get scheduler cluster client config by manager. + var concurrentUploadLimit int32 + if clientConfig, err := v.dynconfig.GetSchedulerClusterClientConfig(); err == nil { + concurrentUploadLimit = int32(clientConfig.LoadLimit) + } + + host, loaded := v.resource.HostManager().Load(req.Host.Id) + if !loaded { + options := []resource.HostOption{ + resource.WithOS(req.Host.Os), + resource.WithPlatform(req.Host.Platform), + resource.WithPlatformFamily(req.Host.PlatformFamily), + resource.WithPlatformVersion(req.Host.PlatformVersion), + resource.WithKernelVersion(req.Host.KernelVersion), + } + + if concurrentUploadLimit > 0 { + options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit)) + } + + if req.Host.Cpu != nil { + options = append(options, resource.WithCPU(resource.CPU{ + LogicalCount: req.Host.Cpu.LogicalCount, + PhysicalCount: req.Host.Cpu.PhysicalCount, + Percent: req.Host.Cpu.Percent, + ProcessPercent: req.Host.Cpu.ProcessPercent, + Times: resource.CPUTimes{ + User: req.Host.Cpu.Times.User, + System: req.Host.Cpu.Times.System, + Idle: req.Host.Cpu.Times.Idle, + Nice: req.Host.Cpu.Times.Nice, + Iowait: req.Host.Cpu.Times.Iowait, + Irq: req.Host.Cpu.Times.Irq, + Softirq: req.Host.Cpu.Times.Softirq, + Steal: req.Host.Cpu.Times.Steal, + Guest: req.Host.Cpu.Times.Guest, + GuestNice: req.Host.Cpu.Times.GuestNice, + }, + })) + } + + if req.Host.Memory != nil { + options = append(options, resource.WithMemory(resource.Memory{ + Total: req.Host.Memory.Total, + Available: req.Host.Memory.Available, + Used: req.Host.Memory.Used, + UsedPercent: req.Host.Memory.UsedPercent, + ProcessUsedPercent: req.Host.Memory.ProcessUsedPercent, + Free: req.Host.Memory.Free, + })) + } + + if req.Host.Network != nil { + options = append(options, resource.WithNetwork(resource.Network{ + TCPConnectionCount: req.Host.Network.TcpConnectionCount, + UploadTCPConnectionCount: req.Host.Network.UploadTcpConnectionCount, + SecurityDomain: req.Host.Network.SecurityDomain, + Location: req.Host.Network.Location, + IDC: req.Host.Network.Idc, + })) + } + + if req.Host.Disk != nil { + options = append(options, resource.WithDisk(resource.Disk{ + Total: req.Host.Disk.Total, + Free: req.Host.Disk.Free, + Used: req.Host.Disk.Used, + UsedPercent: req.Host.Disk.UsedPercent, + InodesTotal: req.Host.Disk.InodesTotal, + InodesUsed: req.Host.Disk.InodesUsed, + InodesFree: req.Host.Disk.InodesFree, + InodesUsedPercent: req.Host.Disk.InodesUsedPercent, + })) + } + + if req.Host.Build != nil { + options = append(options, resource.WithBuild(resource.Build{ + GitVersion: req.Host.Build.GitVersion, + GitCommit: req.Host.Build.GitCommit, + GoVersion: req.Host.Build.GoVersion, + Platform: req.Host.Build.Platform, + })) + } + + host = resource.NewHost( + req.Host.Id, req.Host.Ip, req.Host.Hostname, + req.Host.Port, req.Host.DownloadPort, types.HostType(req.Host.Type), + options..., + ) + + v.resource.HostManager().Store(host) + host.Log.Infof("announce new host: %#v", req) + return nil + } + + // Host already exists and updates properties. + host.Port = req.Host.Port + host.DownloadPort = req.Host.DownloadPort + host.Type = types.HostType(req.Host.Type) + host.OS = req.Host.Os + host.Platform = req.Host.Platform + host.PlatformFamily = req.Host.PlatformFamily + host.PlatformVersion = req.Host.PlatformVersion + host.KernelVersion = req.Host.KernelVersion + host.UpdatedAt.Store(time.Now()) + + if concurrentUploadLimit > 0 { + host.ConcurrentUploadLimit.Store(concurrentUploadLimit) + } + + if req.Host.Cpu != nil { + host.CPU = resource.CPU{ + LogicalCount: req.Host.Cpu.LogicalCount, + PhysicalCount: req.Host.Cpu.PhysicalCount, + Percent: req.Host.Cpu.Percent, + ProcessPercent: req.Host.Cpu.ProcessPercent, + Times: resource.CPUTimes{ + User: req.Host.Cpu.Times.User, + System: req.Host.Cpu.Times.System, + Idle: req.Host.Cpu.Times.Idle, + Nice: req.Host.Cpu.Times.Nice, + Iowait: req.Host.Cpu.Times.Iowait, + Irq: req.Host.Cpu.Times.Irq, + Softirq: req.Host.Cpu.Times.Softirq, + Steal: req.Host.Cpu.Times.Steal, + Guest: req.Host.Cpu.Times.Guest, + GuestNice: req.Host.Cpu.Times.GuestNice, + }, + } + } + + if req.Host.Memory != nil { + host.Memory = resource.Memory{ + Total: req.Host.Memory.Total, + Available: req.Host.Memory.Available, + Used: req.Host.Memory.Used, + UsedPercent: req.Host.Memory.UsedPercent, + ProcessUsedPercent: req.Host.Memory.ProcessUsedPercent, + Free: req.Host.Memory.Free, + } + } + + if req.Host.Network != nil { + host.Network = resource.Network{ + TCPConnectionCount: req.Host.Network.TcpConnectionCount, + UploadTCPConnectionCount: req.Host.Network.UploadTcpConnectionCount, + SecurityDomain: req.Host.Network.SecurityDomain, + Location: req.Host.Network.Location, + IDC: req.Host.Network.Idc, + } + } + + if req.Host.Disk != nil { + host.Disk = resource.Disk{ + Total: req.Host.Disk.Total, + Free: req.Host.Disk.Free, + Used: req.Host.Disk.Used, + UsedPercent: req.Host.Disk.UsedPercent, + InodesTotal: req.Host.Disk.InodesTotal, + InodesUsed: req.Host.Disk.InodesUsed, + InodesFree: req.Host.Disk.InodesFree, + InodesUsedPercent: req.Host.Disk.InodesUsedPercent, + } + } + + if req.Host.Build != nil { + host.Build = resource.Build{ + GitVersion: req.Host.Build.GitVersion, + GitCommit: req.Host.Build.GitCommit, + GoVersion: req.Host.Build.GoVersion, + Platform: req.Host.Build.Platform, + } + } + return nil } // LeaveHost releases host in scheduler. func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) error { + logger.WithHostID(req.Id).Infof("leave host request: %#v", req) + + host, loaded := v.resource.HostManager().Load(req.Id) + if !loaded { + msg := fmt.Sprintf("host %s not found", req.Id) + logger.Error(msg) + return status.Error(codes.NotFound, msg) + } + + host.LeavePeers() return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 4a4287820..3e5af185c 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -18,6 +18,7 @@ package service import ( "context" + "errors" "reflect" "testing" @@ -31,6 +32,8 @@ import ( commonv2 "d7y.io/api/pkg/apis/common/v2" schedulerv2 "d7y.io/api/pkg/apis/scheduler/v2" + managertypes "d7y.io/dragonfly/v2/manager/types" + pkgtypes "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" "d7y.io/dragonfly/v2/scheduler/resource" @@ -68,12 +71,12 @@ func TestService_NewV2(t *testing.T) { func TestServiceV2_StatPeer(t *testing.T) { tests := []struct { name string - mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + mock func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) expect func(t *testing.T, peer *resource.Peer, resp *commonv2.Peer, err error) }{ { name: "peer not found", - mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Any()).Return(nil, false).Times(1), @@ -86,7 +89,7 @@ func TestServiceV2_StatPeer(t *testing.T) { }, { name: "peer has been loaded", - mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { peer.StorePiece(&mockPiece) peer.Task.StorePiece(&mockPiece) gomock.InOrder( @@ -233,22 +236,22 @@ func TestServiceV2_StatPeer(t *testing.T) { peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) - tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT()) + tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT()) resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}) tc.expect(t, peer, resp, err) }) } } -func TestService_LeavePeer(t *testing.T) { +func TestServiceV2_LeavePeer(t *testing.T) { tests := []struct { name string - mock func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) + mock func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) expect func(t *testing.T, err error) }{ { name: "peer not found", - mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Any()).Return(nil, false).Times(1), @@ -261,7 +264,7 @@ func TestService_LeavePeer(t *testing.T) { }, { name: "peer fsm event failed", - mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { peer.FSM.SetState(resource.PeerStateLeave) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), @@ -275,7 +278,7 @@ func TestService_LeavePeer(t *testing.T) { }, { name: "peer leaves succeeded", - mock: func(peer *resource.Peer, peerManager resource.PeerManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { + mock: func(peer *resource.Peer, peerManager resource.PeerManager, mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Any()).Return(peer, true).Times(1), @@ -304,8 +307,602 @@ func TestService_LeavePeer(t *testing.T) { peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost, resource.WithRange(mockPeerRange)) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) - tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT()) + tc.mock(peer, peerManager, res.EXPECT(), peerManager.EXPECT()) tc.expect(t, svc.LeavePeer(context.Background(), &schedulerv2.LeavePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})) }) } } + +func TestServiceV2_StatTask(t *testing.T) { + tests := []struct { + name string + mock func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) + expect func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) + }{ + { + name: "task not found", + mock: func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Any()).Return(nil, false).Times(1), + ) + }, + expect: func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) { + assert := assert.New(t) + assert.ErrorIs(err, status.Errorf(codes.NotFound, "task %s not found", mockTaskID)) + }, + }, + { + name: "task has been loaded", + mock: func(task *resource.Task, taskManager resource.TaskManager, mr *resource.MockResourceMockRecorder, mt *resource.MockTaskManagerMockRecorder) { + task.StorePiece(&mockPiece) + gomock.InOrder( + mr.TaskManager().Return(taskManager).Times(1), + mt.Load(gomock.Any()).Return(task, true).Times(1), + ) + }, + expect: func(t *testing.T, task *resource.Task, resp *commonv2.Task, err error) { + assert := assert.New(t) + assert.EqualValues(resp, &commonv2.Task{ + Id: task.ID, + Type: task.Type, + Url: task.URL, + Digest: task.Digest.String(), + Tag: task.Tag, + Application: task.Application, + Filters: task.Filters, + Header: task.Header, + PieceLength: task.PieceLength, + ContentLength: task.ContentLength.Load(), + PieceCount: task.TotalPieceCount.Load(), + SizeScope: task.SizeScope(), + Pieces: []*commonv2.Piece{ + { + Number: mockPiece.Number, + ParentId: mockPiece.ParentID, + Offset: mockPiece.Offset, + Length: mockPiece.Length, + Digest: mockPiece.Digest.String(), + TrafficType: mockPiece.TrafficType, + Cost: durationpb.New(mockPiece.Cost), + CreatedAt: timestamppb.New(mockPiece.CreatedAt), + }, + }, + State: task.FSM.Current(), + PeerCount: int32(task.PeerCount()), + CreatedAt: timestamppb.New(task.CreatedAt.Load()), + UpdatedAt: timestamppb.New(task.UpdatedAt.Load()), + }) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + taskManager := resource.NewMockTaskManager(ctl) + task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.mock(task, taskManager, res.EXPECT(), taskManager.EXPECT()) + resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{Id: mockTaskID}) + tc.expect(t, task, resp, err) + }) + } +} + +func TestServiceV2_AnnounceHost(t *testing.T) { + tests := []struct { + name string + req *schedulerv2.AnnounceHostRequest + run func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) + }{ + { + name: "host not found", + req: &schedulerv2.AnnounceHostRequest{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(nil, false).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Store(gomock.Any()).Do(func(host *resource.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return().Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + }, + }, + { + name: "host not found and dynconfig returns error", + req: &schedulerv2.AnnounceHostRequest{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(nil, false).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Store(gomock.Any()).Do(func(host *resource.Host) { + assert := assert.New(t) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(50)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }).Return().Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + }, + }, + { + name: "host already exists", + req: &schedulerv2.AnnounceHostRequest{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }, + }, + { + name: "host already exists and dynconfig returns error", + req: &schedulerv2.AnnounceHostRequest{ + Host: &commonv2.Host{ + Id: mockHostID, + Type: uint32(pkgtypes.HostTypeNormal), + Hostname: "hostname", + Ip: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + Os: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + Cpu: &commonv2.CPU{ + LogicalCount: mockCPU.LogicalCount, + PhysicalCount: mockCPU.PhysicalCount, + Percent: mockCPU.Percent, + ProcessPercent: mockCPU.ProcessPercent, + Times: &commonv2.CPUTimes{ + User: mockCPU.Times.User, + System: mockCPU.Times.System, + Idle: mockCPU.Times.Idle, + Nice: mockCPU.Times.Nice, + Iowait: mockCPU.Times.Iowait, + Irq: mockCPU.Times.Irq, + Softirq: mockCPU.Times.Softirq, + Steal: mockCPU.Times.Steal, + Guest: mockCPU.Times.Guest, + GuestNice: mockCPU.Times.GuestNice, + }, + }, + Memory: &commonv2.Memory{ + Total: mockMemory.Total, + Available: mockMemory.Available, + Used: mockMemory.Used, + UsedPercent: mockMemory.UsedPercent, + ProcessUsedPercent: mockMemory.ProcessUsedPercent, + Free: mockMemory.Free, + }, + Network: &commonv2.Network{ + TcpConnectionCount: mockNetwork.TCPConnectionCount, + UploadTcpConnectionCount: mockNetwork.UploadTCPConnectionCount, + SecurityDomain: mockNetwork.SecurityDomain, + Location: mockNetwork.Location, + Idc: mockNetwork.IDC, + }, + Disk: &commonv2.Disk{ + Total: mockDisk.Total, + Free: mockDisk.Free, + Used: mockDisk.Used, + UsedPercent: mockDisk.UsedPercent, + InodesTotal: mockDisk.InodesTotal, + InodesUsed: mockDisk.InodesUsed, + InodesFree: mockDisk.InodesFree, + InodesUsedPercent: mockDisk.InodesUsedPercent, + }, + Build: &commonv2.Build{ + GitVersion: mockBuild.GitVersion, + GitCommit: mockBuild.GitCommit, + GoVersion: mockBuild.GoVersion, + Platform: mockBuild.Platform, + }, + }, + }, + run: func(t *testing.T, svc *V2, req *schedulerv2.AnnounceHostRequest, host *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { + gomock.InOrder( + md.GetSchedulerClusterClientConfig().Return(managertypes.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + ) + + assert := assert.New(t) + assert.NoError(svc.AnnounceHost(context.Background(), req)) + assert.Equal(host.ID, req.Host.Id) + assert.Equal(host.Type, pkgtypes.HostType(req.Host.Type)) + assert.Equal(host.Hostname, req.Host.Hostname) + assert.Equal(host.IP, req.Host.Ip) + assert.Equal(host.Port, req.Host.Port) + assert.Equal(host.DownloadPort, req.Host.DownloadPort) + assert.Equal(host.OS, req.Host.Os) + assert.Equal(host.Platform, req.Host.Platform) + assert.Equal(host.PlatformVersion, req.Host.PlatformVersion) + assert.Equal(host.KernelVersion, req.Host.KernelVersion) + assert.EqualValues(host.CPU, mockCPU) + assert.EqualValues(host.Memory, mockMemory) + assert.EqualValues(host.Network, mockNetwork) + assert.EqualValues(host.Disk, mockDisk) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(50)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEqual(host.CreatedAt.Load().Nanosecond(), 0) + assert.NotEqual(host.UpdatedAt.Load().Nanosecond(), 0) + assert.NotNil(host.Log) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) + }) + } +} + +func TestServiceV2_LeaveHost(t *testing.T) { + tests := []struct { + name string + mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) + expect func(t *testing.T, peer *resource.Peer, err error) + }{ + { + name: "host not found", + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(nil, false).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, err error) { + assert := assert.New(t) + assert.Error(err) + }, + }, + { + name: "host has not peers", + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + }, + }, + { + name: "peer leaves succeeded", + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + host.Peers.Store(mockPeer.ID, mockPeer) + mockPeer.FSM.SetState(resource.PeerStatePending) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.Load(gomock.Any()).Return(host, true).Times(1), + ) + }, + expect: func(t *testing.T, peer *resource.Peer, err error) { + assert := assert.New(t) + assert.NoError(err) + assert.Equal(peer.FSM.Current(), resource.PeerStateLeave) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + scheduling := mocks.NewMockScheduling(ctl) + res := resource.NewMockResource(ctl) + dynconfig := configmocks.NewMockDynconfigInterface(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_DFDAEMON, mockTaskFilters, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) + mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + + tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT()) + tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID})) + }) + } +}