diff --git a/pkg/slices/slices.go b/pkg/slices/slices.go index c10516f54..bb886a69d 100644 --- a/pkg/slices/slices.go +++ b/pkg/slices/slices.go @@ -56,6 +56,11 @@ func RemoveDuplicates[T comparable](s []T) []T { return result } +// Remove removes an element from a collection. +func Remove[T comparable](s []T, i int) []T { + return append(s[:i], s[i+1:]...) +} + // Reverse reverses elements in a collection. func Reverse[S ~[]T, T any](s S) { for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { diff --git a/pkg/slices/slices_test.go b/pkg/slices/slices_test.go index ae7b74971..32cdffab5 100644 --- a/pkg/slices/slices_test.go +++ b/pkg/slices/slices_test.go @@ -114,6 +114,43 @@ func TestRemoveDuplicates(t *testing.T) { } } +func TestRemove(t *testing.T) { + tests := []struct { + name string + input []int + index int + expected []int + }{ + { + name: "remove any element", + input: []int{1, 2, 3}, + index: 1, + expected: []int{1, 3}, + }, + { + name: "remove the first element", + input: []int{1, 2, 3}, + index: 0, + expected: []int{2, 3}, + }, + { + name: "remove the last element", + input: []int{1, 2, 3}, + index: 2, + expected: []int{1, 2}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + result := Remove(tt.input, tt.index) + if !reflect.DeepEqual(result, tt.expected) { + t.Errorf("expected %v, but got %v", tt.expected, result) + } + }) + } +} + func TestReverse(t *testing.T) { tests := []struct { name string diff --git a/scheduler/networktopology/mocks/network_topology_mock.go b/scheduler/networktopology/mocks/network_topology_mock.go index 0b9c28d0f..841b637dc 100644 --- a/scheduler/networktopology/mocks/network_topology_mock.go +++ b/scheduler/networktopology/mocks/network_topology_mock.go @@ -9,6 +9,7 @@ import ( time "time" networktopology "d7y.io/dragonfly/v2/scheduler/networktopology" + resource "d7y.io/dragonfly/v2/scheduler/resource" gomock "github.com/golang/mock/gomock" ) @@ -49,19 +50,19 @@ func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 interface{}) *gomock. return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0) } -// FindProbedHostIDs mocks base method. -func (m *MockNetworkTopology) FindProbedHostIDs(arg0 string) ([]string, error) { +// FindProbedHosts mocks base method. +func (m *MockNetworkTopology) FindProbedHosts(arg0 string) ([]*resource.Host, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "FindProbedHostIDs", arg0) - ret0, _ := ret[0].([]string) + ret := m.ctrl.Call(m, "FindProbedHosts", arg0) + ret0, _ := ret[0].([]*resource.Host) ret1, _ := ret[1].(error) return ret0, ret1 } -// FindProbedHostIDs indicates an expected call of FindProbedHostIDs. -func (mr *MockNetworkTopologyMockRecorder) FindProbedHostIDs(arg0 interface{}) *gomock.Call { +// FindProbedHosts indicates an expected call of FindProbedHosts. +func (mr *MockNetworkTopologyMockRecorder) FindProbedHosts(arg0 interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHostIDs", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHostIDs), arg0) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHosts", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHosts), arg0) } // Has mocks base method. diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index c25d3df6d..743aa3703 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -20,13 +20,16 @@ package networktopology import ( "context" + "errors" "math" + "sort" "time" "github.com/go-redis/redis/v8" "github.com/google/uuid" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/set" pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/resource" @@ -39,6 +42,9 @@ const ( // snapshotContextTimeout is the timeout of snapshot network topology. snapshotContextTimeout = 20 * time.Minute + + // findProbedCandidateHostsLimit is the limit of find probed candidate hosts. + findProbedCandidateHostsLimit = 50 ) // NetworkTopology is an interface for network topology. @@ -55,9 +61,9 @@ type NetworkTopology interface { // Store stores source host and destination host. Store(string, string) error - // TODO Implement function. - // FindProbedHostIDs finds the most candidate destination host to be probed. - FindProbedHostIDs(string) ([]string, error) + // FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts, + // and then return the host with a smaller probed count. + FindProbedHosts(string) ([]*resource.Host, error) // DeleteHost deletes source host and all destination host connected to source host. DeleteHost(string) error @@ -161,10 +167,50 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error { return nil } -// TODO Implement function. -// FindProbedHostIDs finds the most candidate destination host to be probed. -func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) { - return nil, nil +// FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts, +// and then return the host with a smaller probed count. +func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + + blocklist := set.NewSafeSet[string]() + blocklist.Add(hostID) + candidateHosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist) + if len(candidateHosts) == 0 { + return nil, errors.New("probed hosts not found") + } + + if len(candidateHosts) <= nt.config.Probe.Count { + return candidateHosts, nil + } + + var probedCountKeys []string + for _, candidateHost := range candidateHosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(candidateHost.ID)) + } + + rawProbedCounts, err := nt.rdb.MGet(ctx, probedCountKeys...).Result() + if err != nil { + return nil, err + } + + // Filter invalid probed count. If probed key not exist, the probed count is nil. + var probedCounts []uint64 + for _, rawProbedCount := range rawProbedCounts { + probeCount, ok := rawProbedCount.(uint64) + if !ok { + return nil, errors.New("invalid probed count") + } + + probedCounts = append(probedCounts, probeCount) + } + + // Sort candidate hosts by probed count. + sort.Slice(candidateHosts, func(i, j int) bool { + return probedCounts[i] < probedCounts[j] + }) + + return candidateHosts[:nt.config.Probe.Count], nil } // DeleteHost deletes source host and all destination host connected to source host. diff --git a/scheduler/networktopology/network_topology_test.go b/scheduler/networktopology/network_topology_test.go index aa77bd8c1..a5279fc2c 100644 --- a/scheduler/networktopology/network_topology_test.go +++ b/scheduler/networktopology/network_topology_test.go @@ -28,6 +28,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "d7y.io/dragonfly/v2/pkg/container/set" pkgredis "d7y.io/dragonfly/v2/pkg/redis" "d7y.io/dragonfly/v2/scheduler/resource" storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" @@ -295,6 +296,182 @@ func TestNetworkTopology_Store(t *testing.T) { } } +func TestNetworkTopology_FindProbedHosts(t *testing.T) { + tests := []struct { + name string + hosts []*resource.Host + mock func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) + expect func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) + }{ + { + name: "find probed hosts", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(6), uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)}) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.NoError(err) + assert.Equal(len(probedHosts), 5) + assert.EqualValues(probedHosts[0].ID, "bac") + assert.EqualValues(probedHosts[1].ID, "bav") + assert.EqualValues(probedHosts[2].ID, "baz") + assert.EqualValues(probedHosts[3].ID, "bar") + assert.EqualValues(probedHosts[4].ID, "foo") + }, + }, + { + name: "find probed hosts when map is insufficient", + hosts: []*resource.Host{mockHost}, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(1)}) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.NoError(err) + assert.Equal(len(probedHosts), 1) + assert.EqualValues(probedHosts[0].ID, mockHost.ID) + }, + }, + { + name: "get probed count error", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetErr(errors.New("get probed count error")) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.Equal(len(probedHosts), 0) + assert.EqualError(err, "get probed count error") + }, + }, + { + name: "probed hosts not found", + hosts: []*resource.Host{}, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.Equal(len(probedHosts), 0) + assert.EqualError(err, "probed hosts not found") + }, + }, + { + name: "invalid probed count", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"foo", uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)}) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.Equal(len(probedHosts), 0) + assert.EqualError(err, "invalid probed count") + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + + rdb, mockRDBClient := redismock.NewClientMock() + res := resource.NewMockResource(ctl) + storage := storagemocks.NewMockStorage(ctl) + hostManager := resource.NewMockHostManager(ctl) + tc.mock(mockRDBClient, res.EXPECT(), hostManager, hostManager.EXPECT(), tc.hosts) + + mockNetworkTopologyConfig.Probe.Count = 5 + networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) + tc.expect(t, networkTopology, err, tc.hosts) + mockRDBClient.ClearExpect() + }) + } +} + func TestNetworkTopology_DeleteHost(t *testing.T) { tests := []struct { name string diff --git a/scheduler/resource/host_manager.go b/scheduler/resource/host_manager.go index ee066b42e..ecd428689 100644 --- a/scheduler/resource/host_manager.go +++ b/scheduler/resource/host_manager.go @@ -21,6 +21,7 @@ package resource import ( "sync" + "d7y.io/dragonfly/v2/pkg/container/set" pkggc "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" @@ -47,6 +48,9 @@ type HostManager interface { // Delete deletes host for a key. Delete(string) + // LoadRandomHosts loads host randomly through the Range of sync.Map. + LoadRandomHosts(int, set.SafeSet[string]) []*Host + // Try to reclaim host. RunGC() error } @@ -103,6 +107,31 @@ func (h *hostManager) Delete(key string) { h.Map.Delete(key) } +// LoadRandomHosts loads host randomly through the Range of sync.Map. +func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host { + hosts := make([]*Host, 0, n) + h.Map.Range(func(key, value any) bool { + if len(hosts) >= n { + return false + } + + host, ok := value.(*Host) + if !ok { + host.Log.Error("invalid host") + return true + } + + if blocklist.Contains(host.ID) { + return true + } + + hosts = append(hosts, host) + return true + }) + + return hosts +} + // Try to reclaim host. func (h *hostManager) RunGC() error { h.Map.Range(func(_, value any) bool { diff --git a/scheduler/resource/host_manager_mock.go b/scheduler/resource/host_manager_mock.go index 16ce04055..89948df8e 100644 --- a/scheduler/resource/host_manager_mock.go +++ b/scheduler/resource/host_manager_mock.go @@ -7,6 +7,7 @@ package resource import ( reflect "reflect" + set "d7y.io/dragonfly/v2/pkg/container/set" gomock "github.com/golang/mock/gomock" ) @@ -75,6 +76,20 @@ func (mr *MockHostManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockHostManager)(nil).LoadOrStore), arg0) } +// LoadRandomHosts mocks base method. +func (m *MockHostManager) LoadRandomHosts(arg0 int, arg1 set.SafeSet[string]) []*Host { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "LoadRandomHosts", arg0, arg1) + ret0, _ := ret[0].([]*Host) + return ret0 +} + +// LoadRandomHosts indicates an expected call of LoadRandomHosts. +func (mr *MockHostManagerMockRecorder) LoadRandomHosts(arg0, arg1 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandomHosts", reflect.TypeOf((*MockHostManager)(nil).LoadRandomHosts), arg0, arg1) +} + // RunGC mocks base method. func (m *MockHostManager) RunGC() error { m.ctrl.T.Helper() diff --git a/scheduler/resource/host_manager_test.go b/scheduler/resource/host_manager_test.go index 0f6d4f521..a8bb2ef72 100644 --- a/scheduler/resource/host_manager_test.go +++ b/scheduler/resource/host_manager_test.go @@ -27,6 +27,7 @@ import ( commonv2 "d7y.io/api/pkg/apis/common/v2" + "d7y.io/dragonfly/v2/pkg/container/set" "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/scheduler/config" ) @@ -307,6 +308,125 @@ func TestHostManager_Delete(t *testing.T) { } } +func TestHostManager_LoadRandomHosts(t *testing.T) { + tests := []struct { + name string + hosts []*Host + mock func(m *gc.MockGCMockRecorder) + expect func(t *testing.T, hm HostManager, hosts []*Host) + }{ + { + name: "load random hosts", + hosts: []*Host{ + NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type), + NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type), + }, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hm HostManager, hosts []*Host) { + assert := assert.New(t) + for _, host := range hosts { + hm.Store(host) + } + + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockRawSeedHost.ID) + h := hm.LoadRandomHosts(2, blocklist) + assert.Equal(len(h), 1) + }, + }, + { + name: "load random hosts when the load number is 0", + hosts: []*Host{ + NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type), + NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type), + }, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hm HostManager, hosts []*Host) { + assert := assert.New(t) + for _, host := range hosts { + hm.Store(host) + } + + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockRawSeedHost.ID) + h := hm.LoadRandomHosts(0, blocklist) + assert.Equal(len(h), 0) + }, + }, + { + name: "map is empty", + hosts: []*Host{}, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hm HostManager, hosts []*Host) { + assert := assert.New(t) + for _, host := range hosts { + hm.Store(host) + } + + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockRawSeedHost.ID) + h := hm.LoadRandomHosts(1, blocklist) + assert.Equal(len(h), 0) + }, + }, + { + name: "the number of hosts in the map is insufficient", + hosts: []*Host{ + NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type), + NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type), + }, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, hm HostManager, hosts []*Host) { + assert := assert.New(t) + for _, host := range hosts { + hm.Store(host) + } + + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockRawSeedHost.ID) + h := hm.LoadRandomHosts(3, blocklist) + assert.Equal(len(h), 1) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + ctl := gomock.NewController(t) + defer ctl.Finish() + gc := gc.NewMockGC(ctl) + tc.mock(gc.EXPECT()) + + hm, err := newHostManager(mockHostGCConfig, gc) + if err != nil { + t.Fatal(err) + } + + tc.expect(t, hm, tc.hosts) + }) + } +} + func TestHostManager_RunGC(t *testing.T) { tests := []struct { name string diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 29b6b14f5..8e31e6d57 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -677,36 +677,25 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error { // Find probed hosts in network topology. Based on the source host information, // the most candidate hosts will be evaluated. logger.Info("receive SyncProbesRequest_ProbeStartedRequest") - probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) + hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id) if err != nil { logger.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) } var probedHosts []*commonv1.Host - for _, probedHostID := range probedHostIDs { - probedHost, loaded := v.resource.HostManager().Load(probedHostID) - if !loaded { - logger.Warnf("probed host %s not found", probedHostID) - continue - } - + for _, host := range hosts { probedHosts = append(probedHosts, &commonv1.Host{ - Id: probedHost.ID, - Ip: probedHost.IP, - Hostname: probedHost.Hostname, - Port: probedHost.Port, - DownloadPort: probedHost.DownloadPort, - Location: probedHost.Network.Location, - Idc: probedHost.Network.IDC, + Id: host.ID, + Ip: host.IP, + Hostname: host.Hostname, + Port: host.Port, + DownloadPort: host.DownloadPort, + Location: host.Network.Location, + Idc: host.Network.IDC, }) } - if len(probedHosts) == 0 { - logger.Error("probed host not found") - return status.Error(codes.NotFound, "probed host not found") - } - logger.Infof("probe started: %#v", probedHosts) if err := stream.Send(&schedulerv1.SyncProbesResponse{ Hosts: probedHosts, diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 032ad43ba..4632ec9c4 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -2699,9 +2699,7 @@ func TestServiceV1_SyncProbes(t *testing.T) { ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, }, }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1), ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{ Hosts: []*commonv1.Host{ { @@ -2872,7 +2870,7 @@ func TestServiceV1_SyncProbes(t *testing.T) { ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, }, }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), + mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), ) }, expect: func(t *testing.T, err error) { @@ -2880,36 +2878,6 @@ func TestServiceV1_SyncProbes(t *testing.T) { assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error") }, }, - { - name: "load host error when receive ProbeStartedRequest", - mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, - mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, - ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) { - gomock.InOrder( - ms.Recv().Return(&schedulerv1.SyncProbesRequest{ - Host: &commonv1.Host{ - Id: mockRawSeedHost.ID, - Ip: mockRawSeedHost.IP, - Hostname: mockRawSeedHost.Hostname, - Port: mockRawSeedHost.Port, - DownloadPort: mockRawSeedHost.DownloadPort, - Location: mockRawSeedHost.Network.Location, - Idc: mockRawSeedHost.Network.IDC, - }, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }, - }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false), - ) - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found") - }, - }, { name: "send synchronize probes response error", mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, @@ -2930,9 +2898,7 @@ func TestServiceV1_SyncProbes(t *testing.T) { ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, }, }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1), ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{ Hosts: []*commonv1.Host{ { diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 282c6bcd2..27f000e32 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -666,88 +666,77 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error { // Find probed hosts in network topology. Based on the source host information, // the most candidate hosts will be evaluated. logger.Info("receive SyncProbesRequest_ProbeStartedRequest") - probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) + hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id) if err != nil { logger.Error(err) return status.Error(codes.FailedPrecondition, err.Error()) } var probedHosts []*commonv2.Host - for _, probedHostID := range probedHostIDs { - probedHost, loaded := v.resource.HostManager().Load(probedHostID) - if !loaded { - logger.Warnf("probed host %s not found", probedHostID) - continue - } - + for _, host := range hosts { probedHosts = append(probedHosts, &commonv2.Host{ - Id: probedHost.ID, - Type: uint32(probedHost.Type), - Hostname: probedHost.Hostname, - Ip: probedHost.IP, - Port: probedHost.Port, - DownloadPort: probedHost.DownloadPort, - Os: probedHost.OS, - Platform: probedHost.Platform, - PlatformFamily: probedHost.PlatformFamily, - PlatformVersion: probedHost.PlatformVersion, - KernelVersion: probedHost.KernelVersion, + Id: host.ID, + Type: uint32(host.Type), + Hostname: host.Hostname, + Ip: host.IP, + Port: host.Port, + DownloadPort: host.DownloadPort, + Os: host.OS, + Platform: host.Platform, + PlatformFamily: host.PlatformFamily, + PlatformVersion: host.PlatformVersion, + KernelVersion: host.KernelVersion, Cpu: &commonv2.CPU{ - LogicalCount: probedHost.CPU.LogicalCount, - PhysicalCount: probedHost.CPU.PhysicalCount, - Percent: probedHost.CPU.Percent, - ProcessPercent: probedHost.CPU.ProcessPercent, + LogicalCount: host.CPU.LogicalCount, + PhysicalCount: host.CPU.PhysicalCount, + Percent: host.CPU.Percent, + ProcessPercent: host.CPU.ProcessPercent, Times: &commonv2.CPUTimes{ - User: probedHost.CPU.Times.User, - System: probedHost.CPU.Times.System, - Idle: probedHost.CPU.Times.Idle, - Nice: probedHost.CPU.Times.Nice, - Iowait: probedHost.CPU.Times.Iowait, - Irq: probedHost.CPU.Times.Irq, - Softirq: probedHost.CPU.Times.Softirq, - Steal: probedHost.CPU.Times.Steal, - Guest: probedHost.CPU.Times.Guest, - GuestNice: probedHost.CPU.Times.GuestNice, + User: host.CPU.Times.User, + System: host.CPU.Times.System, + Idle: host.CPU.Times.Idle, + Nice: host.CPU.Times.Nice, + Iowait: host.CPU.Times.Iowait, + Irq: host.CPU.Times.Irq, + Softirq: host.CPU.Times.Softirq, + Steal: host.CPU.Times.Steal, + Guest: host.CPU.Times.Guest, + GuestNice: host.CPU.Times.GuestNice, }, }, Memory: &commonv2.Memory{ - Total: probedHost.Memory.Total, - Available: probedHost.Memory.Available, - Used: probedHost.Memory.Used, - UsedPercent: probedHost.Memory.UsedPercent, - ProcessUsedPercent: probedHost.Memory.ProcessUsedPercent, - Free: probedHost.Memory.Free, + Total: host.Memory.Total, + Available: host.Memory.Available, + Used: host.Memory.Used, + UsedPercent: host.Memory.UsedPercent, + ProcessUsedPercent: host.Memory.ProcessUsedPercent, + Free: host.Memory.Free, }, Network: &commonv2.Network{ - TcpConnectionCount: probedHost.Network.TCPConnectionCount, - UploadTcpConnectionCount: probedHost.Network.UploadTCPConnectionCount, - Location: probedHost.Network.Location, - Idc: probedHost.Network.IDC, + TcpConnectionCount: host.Network.TCPConnectionCount, + UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount, + Location: host.Network.Location, + Idc: host.Network.IDC, }, Disk: &commonv2.Disk{ - Total: probedHost.Disk.Total, - Free: probedHost.Disk.Free, - Used: probedHost.Disk.Used, - UsedPercent: probedHost.Disk.UsedPercent, - InodesTotal: probedHost.Disk.InodesTotal, - InodesUsed: probedHost.Disk.InodesUsed, - InodesFree: probedHost.Disk.InodesFree, - InodesUsedPercent: probedHost.Disk.InodesUsedPercent, + Total: host.Disk.Total, + Free: host.Disk.Free, + Used: host.Disk.Used, + UsedPercent: host.Disk.UsedPercent, + InodesTotal: host.Disk.InodesTotal, + InodesUsed: host.Disk.InodesUsed, + InodesFree: host.Disk.InodesFree, + InodesUsedPercent: host.Disk.InodesUsedPercent, }, Build: &commonv2.Build{ - GitVersion: probedHost.Build.GitVersion, - GitCommit: probedHost.Build.GitCommit, - GoVersion: probedHost.Build.GoVersion, - Platform: probedHost.Build.Platform, + GitVersion: host.Build.GitVersion, + GitCommit: host.Build.GitCommit, + GoVersion: host.Build.GoVersion, + Platform: host.Build.Platform, }, }) } - if len(probedHosts) == 0 { - logger.Error("probed host not found") - return status.Error(codes.NotFound, "probed host not found") - } - logger.Infof("probe started: %#v", probedHosts) if err := stream.Send(&schedulerv2.SyncProbesResponse{ Hosts: probedHosts, diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 6fe4d3cb0..753343144 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -972,9 +972,7 @@ func TestServiceV2_SyncProbes(t *testing.T) { ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, }, }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1), ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{ Hosts: []*commonv2.Host{ { @@ -1199,7 +1197,7 @@ func TestServiceV2_SyncProbes(t *testing.T) { ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, }, }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), + mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), ) }, expect: func(t *testing.T, err error) { @@ -1207,45 +1205,6 @@ func TestServiceV2_SyncProbes(t *testing.T) { assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error") }, }, - { - name: "load host error when receive ProbeStartedRequest", - mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, - mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder, - ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) { - gomock.InOrder( - ms.Recv().Return(&schedulerv2.SyncProbesRequest{ - Host: &commonv2.Host{ - Id: mockSeedHostID, - Type: uint32(pkgtypes.HostTypeSuperSeed), - Hostname: "bar", - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Os: "darwin", - Platform: "darwin", - PlatformFamily: "Standalone Workstation", - PlatformVersion: "11.1", - KernelVersion: "20.2.0", - Cpu: mockV2Probe.Host.Cpu, - Memory: mockV2Probe.Host.Memory, - Network: mockV2Probe.Host.Network, - Disk: mockV2Probe.Host.Disk, - Build: mockV2Probe.Host.Build, - }, - Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, - }, - }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false), - ) - }, - expect: func(t *testing.T, err error) { - assert := assert.New(t) - assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found") - }, - }, { name: "send synchronize probes response error", mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, @@ -1275,9 +1234,7 @@ func TestServiceV2_SyncProbes(t *testing.T) { ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, }, }, nil).Times(1), - mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), - mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true), + mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1), ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{ Hosts: []*commonv2.Host{ {