From 1a652e65947921577b011b210ca3649d8ff9455d Mon Sep 17 00:00:00 2001 From: dlut_xz <52518280+fcgxz2003@users.noreply.github.com> Date: Thu, 14 Sep 2023 21:20:34 +0800 Subject: [PATCH] feat: add InitProbedCount in AnnounceHost and DeleteHost in LeaveHost (#2722) Signed-off-by: fcgxz2003 <834756128@qq.com> --- pkg/net/ping/ping.go | 4 +- scheduler/networktopology/network_topology.go | 26 ++-- .../networktopology/network_topology_test.go | 119 +++++++++++++++--- scheduler/service/service_v1.go | 6 + scheduler/service/service_v1_test.go | 39 +++--- scheduler/service/service_v2.go | 5 + scheduler/service/service_v2_test.go | 12 +- 7 files changed, 165 insertions(+), 46 deletions(-) diff --git a/pkg/net/ping/ping.go b/pkg/net/ping/ping.go index 1866f42dd..120e178ea 100644 --- a/pkg/net/ping/ping.go +++ b/pkg/net/ping/ping.go @@ -44,8 +44,8 @@ func Ping(addr string) (*ping.Statistics, error) { pinger.Timeout = defaultPingTimeout // SetPrivileged sets the type of ping pinger will send. - // false means pinger will send an "unprivileged" UDP ping. - pinger.SetPrivileged(false) + // true means pinger will send a "privileged" raw ICMP ping. + pinger.SetPrivileged(true) if err := pinger.Run(); err != nil { return nil, err } diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index d20581ba3..4d31d2459 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -23,6 +23,7 @@ import ( "errors" "math" "sort" + "strconv" "time" "github.com/go-redis/redis/v8" @@ -157,10 +158,6 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error { return err } - if err := nt.rdb.Set(ctx, pkgredis.MakeProbedCountKeyInScheduler(destHostID), 0, 0).Err(); err != nil { - return err - } - return nil } @@ -193,13 +190,28 @@ func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, err // Filter invalid probed count. If probed key not exist, the probed count is nil. var probedCounts []uint64 - for _, rawProbedCount := range rawProbedCounts { - probeCount, ok := rawProbedCount.(uint64) + for i, rawProbedCount := range rawProbedCounts { + // Initialize the probedCount value of host in redis when the host is first selected as the candidate probe target. + if rawProbedCount == nil { + if err := nt.rdb.Set(ctx, pkgredis.MakeProbedCountKeyInScheduler(candidateHosts[i].ID), 0, 0).Err(); err != nil { + return nil, err + } + + probedCounts = append(probedCounts, 0) + continue + } + + value, ok := rawProbedCount.(string) if !ok { + return nil, errors.New("invalid value type") + } + + probedCount, err := strconv.ParseUint(value, 10, 64) + if err != nil { return nil, errors.New("invalid probed count") } - probedCounts = append(probedCounts, probeCount) + probedCounts = append(probedCounts, probedCount) } // Sort candidate hosts by probed count. diff --git a/scheduler/networktopology/network_topology_test.go b/scheduler/networktopology/network_topology_test.go index 2bbda4560..8a9e32352 100644 --- a/scheduler/networktopology/network_topology_test.go +++ b/scheduler/networktopology/network_topology_test.go @@ -242,7 +242,6 @@ func TestNetworkTopology_Store(t *testing.T) { mockRDBClient.MatchExpectationsInOrder(true) mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetVal(1) - mockRDBClient.ExpectSet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), 0, 0).SetVal("ok") }, expect: func(t *testing.T, networkTopology NetworkTopology, err error) { assert := assert.New(t) @@ -263,20 +262,6 @@ func TestNetworkTopology_Store(t *testing.T) { assert.EqualError(networkTopology.Store(mockSeedHost.ID, mockHost.ID), "set createdAt error") }, }, - { - name: "set probed count error when network topology between src host and destination host does not exist", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) - mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetVal(1) - mockRDBClient.ExpectSet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), 0, 0).SetErr(errors.New("set probed count error")) - }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.EqualError(networkTopology.Store(mockSeedHost.ID, mockHost.ID), "set probed count error") - }, - }, } for _, tc := range tests { @@ -324,7 +309,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) } - mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(6), uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)}) + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"6", "5", "4", "3", "2", "1"}) }, expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { assert := assert.New(t) @@ -357,7 +342,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) } - mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(1)}) + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"1"}) }, expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { assert := assert.New(t) @@ -441,7 +426,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) } - mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"foo", uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)}) + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"foo", "5", "4", "3", "2", "1"}) }, expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { assert := assert.New(t) @@ -451,6 +436,104 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) { assert.EqualError(err, "invalid probed count") }, }, + { + name: "invalid value type", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{6, "5", "4", "3", "2", "1"}) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.Equal(len(probedHosts), 0) + assert.EqualError(err, "invalid value type") + }, + }, + { + name: "Initialize the probedCount value of host in redis", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{nil, "5", "4", "3", "2", "1"}) + mockRDBClient.ExpectSet(probedCountKeys[0], 0, 0).SetVal("ok") + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.NoError(err) + assert.Equal(len(probedHosts), 5) + assert.EqualValues(probedHosts[0].ID, mockHost.ID) + assert.EqualValues(probedHosts[1].ID, "bac") + assert.EqualValues(probedHosts[2].ID, "bav") + assert.EqualValues(probedHosts[3].ID, "baz") + assert.EqualValues(probedHosts[4].ID, "bar") + + }, + }, + { + name: "Initialize the probedCount value of host in redis error", + hosts: []*resource.Host{ + mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"}, + }, + mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager, + mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) { + mockRDBClient.MatchExpectationsInOrder(true) + blocklist := set.NewSafeSet[string]() + blocklist.Add(mockSeedHost.ID) + gomock.InOrder( + mr.HostManager().Return(hostManager).Times(1), + mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1), + ) + + var probedCountKeys []string + for _, host := range hosts { + probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID)) + } + + mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{nil, "5", "4", "3", "2", "1"}) + mockRDBClient.ExpectSet(probedCountKeys[0], 0, 0).SetErr(errors.New("Initialize the probedCount value of host in redis error")) + }, + expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) { + assert := assert.New(t) + assert.NoError(err) + probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID) + assert.Equal(len(probedHosts), 0) + assert.EqualError(err, "Initialize the probedCount value of host in redis error") + }, + }, } for _, tc := range tests { diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 9db833696..cd603a497 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -560,6 +560,7 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ req.GetId(), req.GetIp(), req.GetHostname(), req.GetPort(), req.GetDownloadPort(), types.ParseHostType(req.GetType()), options..., ) + v.resource.HostManager().Store(host) host.Log.Infof("announce new host: %#v", req) return nil @@ -658,6 +659,11 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e } host.LeavePeers() + if err := v.networkTopology.DeleteHost(host.ID); err != nil { + logger.Errorf("delete network topology host error: %s", err.Error()) + return err + } + return nil } diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 994a0b15e..08430e735 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -2455,12 +2455,12 @@ func TestServiceV1_AnnounceHost(t *testing.T) { func TestServiceV1_LeaveHost(t *testing.T) { tests := []struct { name string - mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) + mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) expect func(t *testing.T, peer *resource.Peer, err error) }{ { name: "host not found", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(nil, false).Times(1), @@ -2473,10 +2473,11 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "host has not peers", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2486,12 +2487,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateLeave", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateLeave) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2501,12 +2503,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStatePending", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStatePending) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2517,12 +2520,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateReceivedEmpty", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateReceivedEmpty) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2533,12 +2537,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateReceivedTiny", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateReceivedTiny) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2549,12 +2554,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateReceivedSmall", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateReceivedSmall) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2565,12 +2571,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateReceivedNormal", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateReceivedNormal) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2581,12 +2588,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateRunning", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateRunning) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2597,12 +2605,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateBackToSource", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateBackToSource) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2613,12 +2622,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateSucceeded", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateSucceeded) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2629,12 +2639,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { }, { name: "peer state is PeerStateFailed", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStateFailed) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -2662,7 +2673,7 @@ func TestServiceV1_LeaveHost(t *testing.T) { mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) - tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT()) + tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT(), networkTopology.EXPECT()) tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{ Id: idgen.HostIDV2(host.IP, host.Hostname), })) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 9ed230e9c..09082e0ea 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -648,6 +648,11 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e } host.LeavePeers() + if err := v.networkTopology.DeleteHost(host.ID); err != nil { + logger.Errorf("delete network topology host error: %s", err.Error()) + return err + } + return nil } diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 62d36ebc2..06459b1bd 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -853,12 +853,12 @@ func TestServiceV2_AnnounceHost(t *testing.T) { func TestServiceV2_LeaveHost(t *testing.T) { tests := []struct { name string - mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) + mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) expect func(t *testing.T, peer *resource.Peer, err error) }{ { name: "host not found", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(nil, false).Times(1), @@ -871,10 +871,11 @@ func TestServiceV2_LeaveHost(t *testing.T) { }, { name: "host has not peers", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -884,12 +885,13 @@ func TestServiceV2_LeaveHost(t *testing.T) { }, { name: "peer leaves succeeded", - mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) { + mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) { host.Peers.Store(mockPeer.ID, mockPeer) mockPeer.FSM.SetState(resource.PeerStatePending) gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), mh.Load(gomock.Any()).Return(host, true).Times(1), + mnt.DeleteHost(host.ID).Return(nil).Times(1), ) }, expect: func(t *testing.T, peer *resource.Peer, err error) { @@ -917,7 +919,7 @@ func TestServiceV2_LeaveHost(t *testing.T) { mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host) svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology) - tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT()) + tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT(), networkTopology.EXPECT()) tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID})) }) }