feat: add InitProbedCount in AnnounceHost and DeleteHost in LeaveHost (#2722)
Signed-off-by: fcgxz2003 <834756128@qq.com>
This commit is contained in:
parent
d4c9875557
commit
1a652e6594
|
|
@ -44,8 +44,8 @@ func Ping(addr string) (*ping.Statistics, error) {
|
||||||
pinger.Timeout = defaultPingTimeout
|
pinger.Timeout = defaultPingTimeout
|
||||||
|
|
||||||
// SetPrivileged sets the type of ping pinger will send.
|
// SetPrivileged sets the type of ping pinger will send.
|
||||||
// false means pinger will send an "unprivileged" UDP ping.
|
// true means pinger will send a "privileged" raw ICMP ping.
|
||||||
pinger.SetPrivileged(false)
|
pinger.SetPrivileged(true)
|
||||||
if err := pinger.Run(); err != nil {
|
if err := pinger.Run(); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -23,6 +23,7 @@ import (
|
||||||
"errors"
|
"errors"
|
||||||
"math"
|
"math"
|
||||||
"sort"
|
"sort"
|
||||||
|
"strconv"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/go-redis/redis/v8"
|
"github.com/go-redis/redis/v8"
|
||||||
|
|
@ -157,10 +158,6 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := nt.rdb.Set(ctx, pkgredis.MakeProbedCountKeyInScheduler(destHostID), 0, 0).Err(); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -193,13 +190,28 @@ func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, err
|
||||||
|
|
||||||
// Filter invalid probed count. If probed key not exist, the probed count is nil.
|
// Filter invalid probed count. If probed key not exist, the probed count is nil.
|
||||||
var probedCounts []uint64
|
var probedCounts []uint64
|
||||||
for _, rawProbedCount := range rawProbedCounts {
|
for i, rawProbedCount := range rawProbedCounts {
|
||||||
probeCount, ok := rawProbedCount.(uint64)
|
// Initialize the probedCount value of host in redis when the host is first selected as the candidate probe target.
|
||||||
|
if rawProbedCount == nil {
|
||||||
|
if err := nt.rdb.Set(ctx, pkgredis.MakeProbedCountKeyInScheduler(candidateHosts[i].ID), 0, 0).Err(); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
probedCounts = append(probedCounts, 0)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
value, ok := rawProbedCount.(string)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
return nil, errors.New("invalid value type")
|
||||||
|
}
|
||||||
|
|
||||||
|
probedCount, err := strconv.ParseUint(value, 10, 64)
|
||||||
|
if err != nil {
|
||||||
return nil, errors.New("invalid probed count")
|
return nil, errors.New("invalid probed count")
|
||||||
}
|
}
|
||||||
|
|
||||||
probedCounts = append(probedCounts, probeCount)
|
probedCounts = append(probedCounts, probedCount)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort candidate hosts by probed count.
|
// Sort candidate hosts by probed count.
|
||||||
|
|
|
||||||
|
|
@ -242,7 +242,6 @@ func TestNetworkTopology_Store(t *testing.T) {
|
||||||
mockRDBClient.MatchExpectationsInOrder(true)
|
mockRDBClient.MatchExpectationsInOrder(true)
|
||||||
mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0)
|
mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0)
|
||||||
mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetVal(1)
|
mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetVal(1)
|
||||||
mockRDBClient.ExpectSet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), 0, 0).SetVal("ok")
|
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
@ -263,20 +262,6 @@ func TestNetworkTopology_Store(t *testing.T) {
|
||||||
assert.EqualError(networkTopology.Store(mockSeedHost.ID, mockHost.ID), "set createdAt error")
|
assert.EqualError(networkTopology.Store(mockSeedHost.ID, mockHost.ID), "set createdAt error")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
{
|
|
||||||
name: "set probed count error when network topology between src host and destination host does not exist",
|
|
||||||
mock: func(mockRDBClient redismock.ClientMock) {
|
|
||||||
mockRDBClient.MatchExpectationsInOrder(true)
|
|
||||||
mockRDBClient.ExpectExists(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0)
|
|
||||||
mockRDBClient.Regexp().ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "createdAt", `.*`).SetVal(1)
|
|
||||||
mockRDBClient.ExpectSet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID), 0, 0).SetErr(errors.New("set probed count error"))
|
|
||||||
},
|
|
||||||
expect: func(t *testing.T, networkTopology NetworkTopology, err error) {
|
|
||||||
assert := assert.New(t)
|
|
||||||
assert.NoError(err)
|
|
||||||
assert.EqualError(networkTopology.Store(mockSeedHost.ID, mockHost.ID), "set probed count error")
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
|
@ -324,7 +309,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) {
|
||||||
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(6), uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)})
|
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"6", "5", "4", "3", "2", "1"})
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
@ -357,7 +342,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) {
|
||||||
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(1)})
|
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"1"})
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
@ -441,7 +426,7 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) {
|
||||||
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
||||||
}
|
}
|
||||||
|
|
||||||
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"foo", uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)})
|
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"foo", "5", "4", "3", "2", "1"})
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
||||||
assert := assert.New(t)
|
assert := assert.New(t)
|
||||||
|
|
@ -451,6 +436,104 @@ func TestNetworkTopology_FindProbedHosts(t *testing.T) {
|
||||||
assert.EqualError(err, "invalid probed count")
|
assert.EqualError(err, "invalid probed count")
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
|
{
|
||||||
|
name: "invalid value type",
|
||||||
|
hosts: []*resource.Host{
|
||||||
|
mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"},
|
||||||
|
},
|
||||||
|
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
|
||||||
|
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
|
||||||
|
mockRDBClient.MatchExpectationsInOrder(true)
|
||||||
|
blocklist := set.NewSafeSet[string]()
|
||||||
|
blocklist.Add(mockSeedHost.ID)
|
||||||
|
gomock.InOrder(
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
var probedCountKeys []string
|
||||||
|
for _, host := range hosts {
|
||||||
|
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{6, "5", "4", "3", "2", "1"})
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.NoError(err)
|
||||||
|
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
|
||||||
|
assert.Equal(len(probedHosts), 0)
|
||||||
|
assert.EqualError(err, "invalid value type")
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Initialize the probedCount value of host in redis",
|
||||||
|
hosts: []*resource.Host{
|
||||||
|
mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"},
|
||||||
|
},
|
||||||
|
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
|
||||||
|
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
|
||||||
|
mockRDBClient.MatchExpectationsInOrder(true)
|
||||||
|
blocklist := set.NewSafeSet[string]()
|
||||||
|
blocklist.Add(mockSeedHost.ID)
|
||||||
|
gomock.InOrder(
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
var probedCountKeys []string
|
||||||
|
for _, host := range hosts {
|
||||||
|
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{nil, "5", "4", "3", "2", "1"})
|
||||||
|
mockRDBClient.ExpectSet(probedCountKeys[0], 0, 0).SetVal("ok")
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.NoError(err)
|
||||||
|
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
|
||||||
|
assert.NoError(err)
|
||||||
|
assert.Equal(len(probedHosts), 5)
|
||||||
|
assert.EqualValues(probedHosts[0].ID, mockHost.ID)
|
||||||
|
assert.EqualValues(probedHosts[1].ID, "bac")
|
||||||
|
assert.EqualValues(probedHosts[2].ID, "bav")
|
||||||
|
assert.EqualValues(probedHosts[3].ID, "baz")
|
||||||
|
assert.EqualValues(probedHosts[4].ID, "bar")
|
||||||
|
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "Initialize the probedCount value of host in redis error",
|
||||||
|
hosts: []*resource.Host{
|
||||||
|
mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"},
|
||||||
|
},
|
||||||
|
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
|
||||||
|
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
|
||||||
|
mockRDBClient.MatchExpectationsInOrder(true)
|
||||||
|
blocklist := set.NewSafeSet[string]()
|
||||||
|
blocklist.Add(mockSeedHost.ID)
|
||||||
|
gomock.InOrder(
|
||||||
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
|
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
|
||||||
|
)
|
||||||
|
|
||||||
|
var probedCountKeys []string
|
||||||
|
for _, host := range hosts {
|
||||||
|
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
|
||||||
|
}
|
||||||
|
|
||||||
|
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{nil, "5", "4", "3", "2", "1"})
|
||||||
|
mockRDBClient.ExpectSet(probedCountKeys[0], 0, 0).SetErr(errors.New("Initialize the probedCount value of host in redis error"))
|
||||||
|
},
|
||||||
|
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
|
||||||
|
assert := assert.New(t)
|
||||||
|
assert.NoError(err)
|
||||||
|
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
|
||||||
|
assert.Equal(len(probedHosts), 0)
|
||||||
|
assert.EqualError(err, "Initialize the probedCount value of host in redis error")
|
||||||
|
},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range tests {
|
for _, tc := range tests {
|
||||||
|
|
|
||||||
|
|
@ -560,6 +560,7 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
|
||||||
req.GetId(), req.GetIp(), req.GetHostname(), req.GetPort(), req.GetDownloadPort(),
|
req.GetId(), req.GetIp(), req.GetHostname(), req.GetPort(), req.GetDownloadPort(),
|
||||||
types.ParseHostType(req.GetType()), options...,
|
types.ParseHostType(req.GetType()), options...,
|
||||||
)
|
)
|
||||||
|
|
||||||
v.resource.HostManager().Store(host)
|
v.resource.HostManager().Store(host)
|
||||||
host.Log.Infof("announce new host: %#v", req)
|
host.Log.Infof("announce new host: %#v", req)
|
||||||
return nil
|
return nil
|
||||||
|
|
@ -658,6 +659,11 @@ func (v *V1) LeaveHost(ctx context.Context, req *schedulerv1.LeaveHostRequest) e
|
||||||
}
|
}
|
||||||
|
|
||||||
host.LeavePeers()
|
host.LeavePeers()
|
||||||
|
if err := v.networkTopology.DeleteHost(host.ID); err != nil {
|
||||||
|
logger.Errorf("delete network topology host error: %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2455,12 +2455,12 @@ func TestServiceV1_AnnounceHost(t *testing.T) {
|
||||||
func TestServiceV1_LeaveHost(t *testing.T) {
|
func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder)
|
mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder)
|
||||||
expect func(t *testing.T, peer *resource.Peer, err error)
|
expect func(t *testing.T, peer *resource.Peer, err error)
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "host not found",
|
name: "host not found",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(nil, false).Times(1),
|
mh.Load(gomock.Any()).Return(nil, false).Times(1),
|
||||||
|
|
@ -2473,10 +2473,11 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "host has not peers",
|
name: "host has not peers",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2486,12 +2487,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateLeave",
|
name: "peer state is PeerStateLeave",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateLeave)
|
mockPeer.FSM.SetState(resource.PeerStateLeave)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2501,12 +2503,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStatePending",
|
name: "peer state is PeerStatePending",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStatePending)
|
mockPeer.FSM.SetState(resource.PeerStatePending)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2517,12 +2520,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateReceivedEmpty",
|
name: "peer state is PeerStateReceivedEmpty",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateReceivedEmpty)
|
mockPeer.FSM.SetState(resource.PeerStateReceivedEmpty)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2533,12 +2537,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateReceivedTiny",
|
name: "peer state is PeerStateReceivedTiny",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateReceivedTiny)
|
mockPeer.FSM.SetState(resource.PeerStateReceivedTiny)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2549,12 +2554,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateReceivedSmall",
|
name: "peer state is PeerStateReceivedSmall",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateReceivedSmall)
|
mockPeer.FSM.SetState(resource.PeerStateReceivedSmall)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2565,12 +2571,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateReceivedNormal",
|
name: "peer state is PeerStateReceivedNormal",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateReceivedNormal)
|
mockPeer.FSM.SetState(resource.PeerStateReceivedNormal)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2581,12 +2588,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateRunning",
|
name: "peer state is PeerStateRunning",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateRunning)
|
mockPeer.FSM.SetState(resource.PeerStateRunning)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2597,12 +2605,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateBackToSource",
|
name: "peer state is PeerStateBackToSource",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateBackToSource)
|
mockPeer.FSM.SetState(resource.PeerStateBackToSource)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2613,12 +2622,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateSucceeded",
|
name: "peer state is PeerStateSucceeded",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateSucceeded)
|
mockPeer.FSM.SetState(resource.PeerStateSucceeded)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2629,12 +2639,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer state is PeerStateFailed",
|
name: "peer state is PeerStateFailed",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, ms *mocks.MockSchedulingMockRecorder, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStateFailed)
|
mockPeer.FSM.SetState(resource.PeerStateFailed)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -2662,7 +2673,7 @@ func TestServiceV1_LeaveHost(t *testing.T) {
|
||||||
mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host)
|
mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host)
|
||||||
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
|
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
|
||||||
|
|
||||||
tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT())
|
tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT(), networkTopology.EXPECT())
|
||||||
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{
|
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{
|
||||||
Id: idgen.HostIDV2(host.IP, host.Hostname),
|
Id: idgen.HostIDV2(host.IP, host.Hostname),
|
||||||
}))
|
}))
|
||||||
|
|
|
||||||
|
|
@ -648,6 +648,11 @@ func (v *V2) LeaveHost(ctx context.Context, req *schedulerv2.LeaveHostRequest) e
|
||||||
}
|
}
|
||||||
|
|
||||||
host.LeavePeers()
|
host.LeavePeers()
|
||||||
|
if err := v.networkTopology.DeleteHost(host.ID); err != nil {
|
||||||
|
logger.Errorf("delete network topology host error: %s", err.Error())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -853,12 +853,12 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
|
||||||
func TestServiceV2_LeaveHost(t *testing.T) {
|
func TestServiceV2_LeaveHost(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
name string
|
name string
|
||||||
mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder)
|
mock func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder)
|
||||||
expect func(t *testing.T, peer *resource.Peer, err error)
|
expect func(t *testing.T, peer *resource.Peer, err error)
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "host not found",
|
name: "host not found",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(nil, false).Times(1),
|
mh.Load(gomock.Any()).Return(nil, false).Times(1),
|
||||||
|
|
@ -871,10 +871,11 @@ func TestServiceV2_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "host has not peers",
|
name: "host has not peers",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -884,12 +885,13 @@ func TestServiceV2_LeaveHost(t *testing.T) {
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "peer leaves succeeded",
|
name: "peer leaves succeeded",
|
||||||
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder) {
|
mock: func(host *resource.Host, mockPeer *resource.Peer, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, mnt *networktopologymocks.MockNetworkTopologyMockRecorder) {
|
||||||
host.Peers.Store(mockPeer.ID, mockPeer)
|
host.Peers.Store(mockPeer.ID, mockPeer)
|
||||||
mockPeer.FSM.SetState(resource.PeerStatePending)
|
mockPeer.FSM.SetState(resource.PeerStatePending)
|
||||||
gomock.InOrder(
|
gomock.InOrder(
|
||||||
mr.HostManager().Return(hostManager).Times(1),
|
mr.HostManager().Return(hostManager).Times(1),
|
||||||
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
mh.Load(gomock.Any()).Return(host, true).Times(1),
|
||||||
|
mnt.DeleteHost(host.ID).Return(nil).Times(1),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
expect: func(t *testing.T, peer *resource.Peer, err error) {
|
||||||
|
|
@ -917,7 +919,7 @@ func TestServiceV2_LeaveHost(t *testing.T) {
|
||||||
mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host)
|
mockPeer := resource.NewPeer(mockSeedPeerID, mockResourceConfig, mockTask, host)
|
||||||
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
|
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage, networkTopology)
|
||||||
|
|
||||||
tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT())
|
tc.mock(host, mockPeer, hostManager, res.EXPECT(), hostManager.EXPECT(), networkTopology.EXPECT())
|
||||||
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID}))
|
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv2.LeaveHostRequest{Id: mockHostID}))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue