feat: implement FindProbedHosts and add LoadRandomHosts to host manager (#2519)

Signed-off-by: Gaius <gaius.qi@gmail.com>
Signed-off-by: XZ <834756128@qq.com>
Co-authored-by: dlut_xz <52518280+fcgxz2003@users.noreply.github.com>
This commit is contained in:
Gaius 2023-07-06 10:56:52 +08:00 committed by GitHub
parent d74ef9894e
commit 1da355018b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 508 additions and 177 deletions

View File

@ -56,6 +56,11 @@ func RemoveDuplicates[T comparable](s []T) []T {
return result return result
} }
// Remove removes an element from a collection.
func Remove[T comparable](s []T, i int) []T {
return append(s[:i], s[i+1:]...)
}
// Reverse reverses elements in a collection. // Reverse reverses elements in a collection.
func Reverse[S ~[]T, T any](s S) { func Reverse[S ~[]T, T any](s S) {
for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 { for i, j := 0, len(s)-1; i < j; i, j = i+1, j-1 {

View File

@ -114,6 +114,43 @@ func TestRemoveDuplicates(t *testing.T) {
} }
} }
func TestRemove(t *testing.T) {
tests := []struct {
name string
input []int
index int
expected []int
}{
{
name: "remove any element",
input: []int{1, 2, 3},
index: 1,
expected: []int{1, 3},
},
{
name: "remove the first element",
input: []int{1, 2, 3},
index: 0,
expected: []int{2, 3},
},
{
name: "remove the last element",
input: []int{1, 2, 3},
index: 2,
expected: []int{1, 2},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := Remove(tt.input, tt.index)
if !reflect.DeepEqual(result, tt.expected) {
t.Errorf("expected %v, but got %v", tt.expected, result)
}
})
}
}
func TestReverse(t *testing.T) { func TestReverse(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -9,6 +9,7 @@ import (
time "time" time "time"
networktopology "d7y.io/dragonfly/v2/scheduler/networktopology" networktopology "d7y.io/dragonfly/v2/scheduler/networktopology"
resource "d7y.io/dragonfly/v2/scheduler/resource"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
) )
@ -49,19 +50,19 @@ func (mr *MockNetworkTopologyMockRecorder) DeleteHost(arg0 interface{}) *gomock.
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteHost", reflect.TypeOf((*MockNetworkTopology)(nil).DeleteHost), arg0)
} }
// FindProbedHostIDs mocks base method. // FindProbedHosts mocks base method.
func (m *MockNetworkTopology) FindProbedHostIDs(arg0 string) ([]string, error) { func (m *MockNetworkTopology) FindProbedHosts(arg0 string) ([]*resource.Host, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "FindProbedHostIDs", arg0) ret := m.ctrl.Call(m, "FindProbedHosts", arg0)
ret0, _ := ret[0].([]string) ret0, _ := ret[0].([]*resource.Host)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }
// FindProbedHostIDs indicates an expected call of FindProbedHostIDs. // FindProbedHosts indicates an expected call of FindProbedHosts.
func (mr *MockNetworkTopologyMockRecorder) FindProbedHostIDs(arg0 interface{}) *gomock.Call { func (mr *MockNetworkTopologyMockRecorder) FindProbedHosts(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper() mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHostIDs", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHostIDs), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "FindProbedHosts", reflect.TypeOf((*MockNetworkTopology)(nil).FindProbedHosts), arg0)
} }
// Has mocks base method. // Has mocks base method.

View File

@ -20,13 +20,16 @@ package networktopology
import ( import (
"context" "context"
"errors"
"math" "math"
"sort"
"time" "time"
"github.com/go-redis/redis/v8" "github.com/go-redis/redis/v8"
"github.com/google/uuid" "github.com/google/uuid"
logger "d7y.io/dragonfly/v2/internal/dflog" logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/set"
pkgredis "d7y.io/dragonfly/v2/pkg/redis" pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/resource"
@ -39,6 +42,9 @@ const (
// snapshotContextTimeout is the timeout of snapshot network topology. // snapshotContextTimeout is the timeout of snapshot network topology.
snapshotContextTimeout = 20 * time.Minute snapshotContextTimeout = 20 * time.Minute
// findProbedCandidateHostsLimit is the limit of find probed candidate hosts.
findProbedCandidateHostsLimit = 50
) )
// NetworkTopology is an interface for network topology. // NetworkTopology is an interface for network topology.
@ -55,9 +61,9 @@ type NetworkTopology interface {
// Store stores source host and destination host. // Store stores source host and destination host.
Store(string, string) error Store(string, string) error
// TODO Implement function. // FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts,
// FindProbedHostIDs finds the most candidate destination host to be probed. // and then return the host with a smaller probed count.
FindProbedHostIDs(string) ([]string, error) FindProbedHosts(string) ([]*resource.Host, error)
// DeleteHost deletes source host and all destination host connected to source host. // DeleteHost deletes source host and all destination host connected to source host.
DeleteHost(string) error DeleteHost(string) error
@ -161,10 +167,50 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error {
return nil return nil
} }
// TODO Implement function. // FindProbedHosts finds the most candidate destination host to be probed, randomly find a range of hosts,
// FindProbedHostIDs finds the most candidate destination host to be probed. // and then return the host with a smaller probed count.
func (nt *networkTopology) FindProbedHostIDs(hostID string) ([]string, error) { func (nt *networkTopology) FindProbedHosts(hostID string) ([]*resource.Host, error) {
return nil, nil ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
defer cancel()
blocklist := set.NewSafeSet[string]()
blocklist.Add(hostID)
candidateHosts := nt.resource.HostManager().LoadRandomHosts(findProbedCandidateHostsLimit, blocklist)
if len(candidateHosts) == 0 {
return nil, errors.New("probed hosts not found")
}
if len(candidateHosts) <= nt.config.Probe.Count {
return candidateHosts, nil
}
var probedCountKeys []string
for _, candidateHost := range candidateHosts {
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(candidateHost.ID))
}
rawProbedCounts, err := nt.rdb.MGet(ctx, probedCountKeys...).Result()
if err != nil {
return nil, err
}
// Filter invalid probed count. If probed key not exist, the probed count is nil.
var probedCounts []uint64
for _, rawProbedCount := range rawProbedCounts {
probeCount, ok := rawProbedCount.(uint64)
if !ok {
return nil, errors.New("invalid probed count")
}
probedCounts = append(probedCounts, probeCount)
}
// Sort candidate hosts by probed count.
sort.Slice(candidateHosts, func(i, j int) bool {
return probedCounts[i] < probedCounts[j]
})
return candidateHosts[:nt.config.Probe.Count], nil
} }
// DeleteHost deletes source host and all destination host connected to source host. // DeleteHost deletes source host and all destination host connected to source host.

View File

@ -28,6 +28,7 @@ import (
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/pkg/container/set"
pkgredis "d7y.io/dragonfly/v2/pkg/redis" pkgredis "d7y.io/dragonfly/v2/pkg/redis"
"d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/resource"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
@ -295,6 +296,182 @@ func TestNetworkTopology_Store(t *testing.T) {
} }
} }
func TestNetworkTopology_FindProbedHosts(t *testing.T) {
tests := []struct {
name string
hosts []*resource.Host
mock func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host)
expect func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host)
}{
{
name: "find probed hosts",
hosts: []*resource.Host{
mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"},
},
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
mockRDBClient.MatchExpectationsInOrder(true)
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockSeedHost.ID)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
)
var probedCountKeys []string
for _, host := range hosts {
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
}
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(6), uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)})
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
assert := assert.New(t)
assert.NoError(err)
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
assert.NoError(err)
assert.Equal(len(probedHosts), 5)
assert.EqualValues(probedHosts[0].ID, "bac")
assert.EqualValues(probedHosts[1].ID, "bav")
assert.EqualValues(probedHosts[2].ID, "baz")
assert.EqualValues(probedHosts[3].ID, "bar")
assert.EqualValues(probedHosts[4].ID, "foo")
},
},
{
name: "find probed hosts when map is insufficient",
hosts: []*resource.Host{mockHost},
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
mockRDBClient.MatchExpectationsInOrder(true)
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockSeedHost.ID)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
)
var probedCountKeys []string
for _, host := range hosts {
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
}
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{uint64(1)})
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
assert := assert.New(t)
assert.NoError(err)
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
assert.NoError(err)
assert.Equal(len(probedHosts), 1)
assert.EqualValues(probedHosts[0].ID, mockHost.ID)
},
},
{
name: "get probed count error",
hosts: []*resource.Host{
mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"},
},
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
mockRDBClient.MatchExpectationsInOrder(true)
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockSeedHost.ID)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
)
var probedCountKeys []string
for _, host := range hosts {
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
}
mockRDBClient.ExpectMGet(probedCountKeys...).SetErr(errors.New("get probed count error"))
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
assert := assert.New(t)
assert.NoError(err)
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
assert.Equal(len(probedHosts), 0)
assert.EqualError(err, "get probed count error")
},
},
{
name: "probed hosts not found",
hosts: []*resource.Host{},
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
mockRDBClient.MatchExpectationsInOrder(true)
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockSeedHost.ID)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
)
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
assert := assert.New(t)
assert.NoError(err)
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
assert.Equal(len(probedHosts), 0)
assert.EqualError(err, "probed hosts not found")
},
},
{
name: "invalid probed count",
hosts: []*resource.Host{
mockHost, {ID: "foo"}, {ID: "bar"}, {ID: "baz"}, {ID: "bav"}, {ID: "bac"},
},
mock: func(mockRDBClient redismock.ClientMock, mr *resource.MockResourceMockRecorder, hostManager resource.HostManager,
mh *resource.MockHostManagerMockRecorder, hosts []*resource.Host) {
mockRDBClient.MatchExpectationsInOrder(true)
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockSeedHost.ID)
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.LoadRandomHosts(gomock.Eq(findProbedCandidateHostsLimit), gomock.Eq(blocklist)).Return(hosts).Times(1),
)
var probedCountKeys []string
for _, host := range hosts {
probedCountKeys = append(probedCountKeys, pkgredis.MakeProbedCountKeyInScheduler(host.ID))
}
mockRDBClient.ExpectMGet(probedCountKeys...).SetVal([]interface{}{"foo", uint64(5), uint64(4), uint64(3), uint64(2), uint64(1)})
},
expect: func(t *testing.T, networkTopology NetworkTopology, err error, hosts []*resource.Host) {
assert := assert.New(t)
assert.NoError(err)
probedHosts, err := networkTopology.FindProbedHosts(mockSeedHost.ID)
assert.Equal(len(probedHosts), 0)
assert.EqualError(err, "invalid probed count")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
rdb, mockRDBClient := redismock.NewClientMock()
res := resource.NewMockResource(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
tc.mock(mockRDBClient, res.EXPECT(), hostManager, hostManager.EXPECT(), tc.hosts)
mockNetworkTopologyConfig.Probe.Count = 5
networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage)
tc.expect(t, networkTopology, err, tc.hosts)
mockRDBClient.ClearExpect()
})
}
}
func TestNetworkTopology_DeleteHost(t *testing.T) { func TestNetworkTopology_DeleteHost(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -21,6 +21,7 @@ package resource
import ( import (
"sync" "sync"
"d7y.io/dragonfly/v2/pkg/container/set"
pkggc "d7y.io/dragonfly/v2/pkg/gc" pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
@ -47,6 +48,9 @@ type HostManager interface {
// Delete deletes host for a key. // Delete deletes host for a key.
Delete(string) Delete(string)
// LoadRandomHosts loads host randomly through the Range of sync.Map.
LoadRandomHosts(int, set.SafeSet[string]) []*Host
// Try to reclaim host. // Try to reclaim host.
RunGC() error RunGC() error
} }
@ -103,6 +107,31 @@ func (h *hostManager) Delete(key string) {
h.Map.Delete(key) h.Map.Delete(key)
} }
// LoadRandomHosts loads host randomly through the Range of sync.Map.
func (h *hostManager) LoadRandomHosts(n int, blocklist set.SafeSet[string]) []*Host {
hosts := make([]*Host, 0, n)
h.Map.Range(func(key, value any) bool {
if len(hosts) >= n {
return false
}
host, ok := value.(*Host)
if !ok {
host.Log.Error("invalid host")
return true
}
if blocklist.Contains(host.ID) {
return true
}
hosts = append(hosts, host)
return true
})
return hosts
}
// Try to reclaim host. // Try to reclaim host.
func (h *hostManager) RunGC() error { func (h *hostManager) RunGC() error {
h.Map.Range(func(_, value any) bool { h.Map.Range(func(_, value any) bool {

View File

@ -7,6 +7,7 @@ package resource
import ( import (
reflect "reflect" reflect "reflect"
set "d7y.io/dragonfly/v2/pkg/container/set"
gomock "github.com/golang/mock/gomock" gomock "github.com/golang/mock/gomock"
) )
@ -75,6 +76,20 @@ func (mr *MockHostManagerMockRecorder) LoadOrStore(arg0 interface{}) *gomock.Cal
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockHostManager)(nil).LoadOrStore), arg0) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadOrStore", reflect.TypeOf((*MockHostManager)(nil).LoadOrStore), arg0)
} }
// LoadRandomHosts mocks base method.
func (m *MockHostManager) LoadRandomHosts(arg0 int, arg1 set.SafeSet[string]) []*Host {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "LoadRandomHosts", arg0, arg1)
ret0, _ := ret[0].([]*Host)
return ret0
}
// LoadRandomHosts indicates an expected call of LoadRandomHosts.
func (mr *MockHostManagerMockRecorder) LoadRandomHosts(arg0, arg1 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LoadRandomHosts", reflect.TypeOf((*MockHostManager)(nil).LoadRandomHosts), arg0, arg1)
}
// RunGC mocks base method. // RunGC mocks base method.
func (m *MockHostManager) RunGC() error { func (m *MockHostManager) RunGC() error {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -27,6 +27,7 @@ import (
commonv2 "d7y.io/api/pkg/apis/common/v2" commonv2 "d7y.io/api/pkg/apis/common/v2"
"d7y.io/dragonfly/v2/pkg/container/set"
"d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
) )
@ -307,6 +308,125 @@ func TestHostManager_Delete(t *testing.T) {
} }
} }
func TestHostManager_LoadRandomHosts(t *testing.T) {
tests := []struct {
name string
hosts []*Host
mock func(m *gc.MockGCMockRecorder)
expect func(t *testing.T, hm HostManager, hosts []*Host)
}{
{
name: "load random hosts",
hosts: []*Host{
NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type),
NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type),
},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, hm HostManager, hosts []*Host) {
assert := assert.New(t)
for _, host := range hosts {
hm.Store(host)
}
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockRawSeedHost.ID)
h := hm.LoadRandomHosts(2, blocklist)
assert.Equal(len(h), 1)
},
},
{
name: "load random hosts when the load number is 0",
hosts: []*Host{
NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type),
NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type),
},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, hm HostManager, hosts []*Host) {
assert := assert.New(t)
for _, host := range hosts {
hm.Store(host)
}
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockRawSeedHost.ID)
h := hm.LoadRandomHosts(0, blocklist)
assert.Equal(len(h), 0)
},
},
{
name: "map is empty",
hosts: []*Host{},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, hm HostManager, hosts []*Host) {
assert := assert.New(t)
for _, host := range hosts {
hm.Store(host)
}
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockRawSeedHost.ID)
h := hm.LoadRandomHosts(1, blocklist)
assert.Equal(len(h), 0)
},
},
{
name: "the number of hosts in the map is insufficient",
hosts: []*Host{
NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type),
NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type),
},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, hm HostManager, hosts []*Host) {
assert := assert.New(t)
for _, host := range hosts {
hm.Store(host)
}
blocklist := set.NewSafeSet[string]()
blocklist.Add(mockRawSeedHost.ID)
h := hm.LoadRandomHosts(3, blocklist)
assert.Equal(len(h), 1)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
hm, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
}
tc.expect(t, hm, tc.hosts)
})
}
}
func TestHostManager_RunGC(t *testing.T) { func TestHostManager_RunGC(t *testing.T) {
tests := []struct { tests := []struct {
name string name string

View File

@ -677,36 +677,25 @@ func (v *V1) SyncProbes(stream schedulerv1.Scheduler_SyncProbesServer) error {
// Find probed hosts in network topology. Based on the source host information, // Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated. // the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest") logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
} }
var probedHosts []*commonv1.Host var probedHosts []*commonv1.Host
for _, probedHostID := range probedHostIDs { for _, host := range hosts {
probedHost, loaded := v.resource.HostManager().Load(probedHostID)
if !loaded {
logger.Warnf("probed host %s not found", probedHostID)
continue
}
probedHosts = append(probedHosts, &commonv1.Host{ probedHosts = append(probedHosts, &commonv1.Host{
Id: probedHost.ID, Id: host.ID,
Ip: probedHost.IP, Ip: host.IP,
Hostname: probedHost.Hostname, Hostname: host.Hostname,
Port: probedHost.Port, Port: host.Port,
DownloadPort: probedHost.DownloadPort, DownloadPort: host.DownloadPort,
Location: probedHost.Network.Location, Location: host.Network.Location,
Idc: probedHost.Network.IDC, Idc: host.Network.IDC,
}) })
} }
if len(probedHosts) == 0 {
logger.Error("probed host not found")
return status.Error(codes.NotFound, "probed host not found")
}
logger.Infof("probe started: %#v", probedHosts) logger.Infof("probe started: %#v", probedHosts)
if err := stream.Send(&schedulerv1.SyncProbesResponse{ if err := stream.Send(&schedulerv1.SyncProbesResponse{
Hosts: probedHosts, Hosts: probedHosts,

View File

@ -2699,9 +2699,7 @@ func TestServiceV1_SyncProbes(t *testing.T) {
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}, },
}, nil).Times(1), }, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{ ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{
Hosts: []*commonv1.Host{ Hosts: []*commonv1.Host{
{ {
@ -2872,7 +2870,7 @@ func TestServiceV1_SyncProbes(t *testing.T) {
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}, },
}, nil).Times(1), }, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1),
) )
}, },
expect: func(t *testing.T, err error) { expect: func(t *testing.T, err error) {
@ -2880,36 +2878,6 @@ func TestServiceV1_SyncProbes(t *testing.T) {
assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error") assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error")
}, },
}, },
{
name: "load host error when receive ProbeStartedRequest",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv1mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv1.SyncProbesRequest{
Host: &commonv1.Host{
Id: mockRawSeedHost.ID,
Ip: mockRawSeedHost.IP,
Hostname: mockRawSeedHost.Hostname,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Location: mockRawSeedHost.Network.Location,
Idc: mockRawSeedHost.Network.IDC,
},
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found")
},
},
{ {
name: "send synchronize probes response error", name: "send synchronize probes response error",
mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, mock: func(svc *V1, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
@ -2930,9 +2898,7 @@ func TestServiceV1_SyncProbes(t *testing.T) {
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}, },
}, nil).Times(1), }, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{ ms.Send(gomock.Eq(&schedulerv1.SyncProbesResponse{
Hosts: []*commonv1.Host{ Hosts: []*commonv1.Host{
{ {

View File

@ -666,88 +666,77 @@ func (v *V2) SyncProbes(stream schedulerv2.Scheduler_SyncProbesServer) error {
// Find probed hosts in network topology. Based on the source host information, // Find probed hosts in network topology. Based on the source host information,
// the most candidate hosts will be evaluated. // the most candidate hosts will be evaluated.
logger.Info("receive SyncProbesRequest_ProbeStartedRequest") logger.Info("receive SyncProbesRequest_ProbeStartedRequest")
probedHostIDs, err := v.networkTopology.FindProbedHostIDs(req.Host.Id) hosts, err := v.networkTopology.FindProbedHosts(req.Host.Id)
if err != nil { if err != nil {
logger.Error(err) logger.Error(err)
return status.Error(codes.FailedPrecondition, err.Error()) return status.Error(codes.FailedPrecondition, err.Error())
} }
var probedHosts []*commonv2.Host var probedHosts []*commonv2.Host
for _, probedHostID := range probedHostIDs { for _, host := range hosts {
probedHost, loaded := v.resource.HostManager().Load(probedHostID)
if !loaded {
logger.Warnf("probed host %s not found", probedHostID)
continue
}
probedHosts = append(probedHosts, &commonv2.Host{ probedHosts = append(probedHosts, &commonv2.Host{
Id: probedHost.ID, Id: host.ID,
Type: uint32(probedHost.Type), Type: uint32(host.Type),
Hostname: probedHost.Hostname, Hostname: host.Hostname,
Ip: probedHost.IP, Ip: host.IP,
Port: probedHost.Port, Port: host.Port,
DownloadPort: probedHost.DownloadPort, DownloadPort: host.DownloadPort,
Os: probedHost.OS, Os: host.OS,
Platform: probedHost.Platform, Platform: host.Platform,
PlatformFamily: probedHost.PlatformFamily, PlatformFamily: host.PlatformFamily,
PlatformVersion: probedHost.PlatformVersion, PlatformVersion: host.PlatformVersion,
KernelVersion: probedHost.KernelVersion, KernelVersion: host.KernelVersion,
Cpu: &commonv2.CPU{ Cpu: &commonv2.CPU{
LogicalCount: probedHost.CPU.LogicalCount, LogicalCount: host.CPU.LogicalCount,
PhysicalCount: probedHost.CPU.PhysicalCount, PhysicalCount: host.CPU.PhysicalCount,
Percent: probedHost.CPU.Percent, Percent: host.CPU.Percent,
ProcessPercent: probedHost.CPU.ProcessPercent, ProcessPercent: host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{ Times: &commonv2.CPUTimes{
User: probedHost.CPU.Times.User, User: host.CPU.Times.User,
System: probedHost.CPU.Times.System, System: host.CPU.Times.System,
Idle: probedHost.CPU.Times.Idle, Idle: host.CPU.Times.Idle,
Nice: probedHost.CPU.Times.Nice, Nice: host.CPU.Times.Nice,
Iowait: probedHost.CPU.Times.Iowait, Iowait: host.CPU.Times.Iowait,
Irq: probedHost.CPU.Times.Irq, Irq: host.CPU.Times.Irq,
Softirq: probedHost.CPU.Times.Softirq, Softirq: host.CPU.Times.Softirq,
Steal: probedHost.CPU.Times.Steal, Steal: host.CPU.Times.Steal,
Guest: probedHost.CPU.Times.Guest, Guest: host.CPU.Times.Guest,
GuestNice: probedHost.CPU.Times.GuestNice, GuestNice: host.CPU.Times.GuestNice,
}, },
}, },
Memory: &commonv2.Memory{ Memory: &commonv2.Memory{
Total: probedHost.Memory.Total, Total: host.Memory.Total,
Available: probedHost.Memory.Available, Available: host.Memory.Available,
Used: probedHost.Memory.Used, Used: host.Memory.Used,
UsedPercent: probedHost.Memory.UsedPercent, UsedPercent: host.Memory.UsedPercent,
ProcessUsedPercent: probedHost.Memory.ProcessUsedPercent, ProcessUsedPercent: host.Memory.ProcessUsedPercent,
Free: probedHost.Memory.Free, Free: host.Memory.Free,
}, },
Network: &commonv2.Network{ Network: &commonv2.Network{
TcpConnectionCount: probedHost.Network.TCPConnectionCount, TcpConnectionCount: host.Network.TCPConnectionCount,
UploadTcpConnectionCount: probedHost.Network.UploadTCPConnectionCount, UploadTcpConnectionCount: host.Network.UploadTCPConnectionCount,
Location: probedHost.Network.Location, Location: host.Network.Location,
Idc: probedHost.Network.IDC, Idc: host.Network.IDC,
}, },
Disk: &commonv2.Disk{ Disk: &commonv2.Disk{
Total: probedHost.Disk.Total, Total: host.Disk.Total,
Free: probedHost.Disk.Free, Free: host.Disk.Free,
Used: probedHost.Disk.Used, Used: host.Disk.Used,
UsedPercent: probedHost.Disk.UsedPercent, UsedPercent: host.Disk.UsedPercent,
InodesTotal: probedHost.Disk.InodesTotal, InodesTotal: host.Disk.InodesTotal,
InodesUsed: probedHost.Disk.InodesUsed, InodesUsed: host.Disk.InodesUsed,
InodesFree: probedHost.Disk.InodesFree, InodesFree: host.Disk.InodesFree,
InodesUsedPercent: probedHost.Disk.InodesUsedPercent, InodesUsedPercent: host.Disk.InodesUsedPercent,
}, },
Build: &commonv2.Build{ Build: &commonv2.Build{
GitVersion: probedHost.Build.GitVersion, GitVersion: host.Build.GitVersion,
GitCommit: probedHost.Build.GitCommit, GitCommit: host.Build.GitCommit,
GoVersion: probedHost.Build.GoVersion, GoVersion: host.Build.GoVersion,
Platform: probedHost.Build.Platform, Platform: host.Build.Platform,
}, },
}) })
} }
if len(probedHosts) == 0 {
logger.Error("probed host not found")
return status.Error(codes.NotFound, "probed host not found")
}
logger.Infof("probe started: %#v", probedHosts) logger.Infof("probe started: %#v", probedHosts)
if err := stream.Send(&schedulerv2.SyncProbesResponse{ if err := stream.Send(&schedulerv2.SyncProbesResponse{
Hosts: probedHosts, Hosts: probedHosts,

View File

@ -972,9 +972,7 @@ func TestServiceV2_SyncProbes(t *testing.T) {
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
}, },
}, nil).Times(1), }, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{ ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
Hosts: []*commonv2.Host{ Hosts: []*commonv2.Host{
{ {
@ -1199,7 +1197,7 @@ func TestServiceV2_SyncProbes(t *testing.T) {
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
}, },
}, nil).Times(1), }, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1), mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return(nil, errors.New("find probed host ids error")).Times(1),
) )
}, },
expect: func(t *testing.T, err error) { expect: func(t *testing.T, err error) {
@ -1207,45 +1205,6 @@ func TestServiceV2_SyncProbes(t *testing.T) {
assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error") assert.EqualError(err, "rpc error: code = FailedPrecondition desc = find probed host ids error")
}, },
}, },
{
name: "load host error when receive ProbeStartedRequest",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
mn *networktopologymocks.MockNetworkTopologyMockRecorder, hostManager resource.HostManager, mh *resource.MockHostManagerMockRecorder,
ms *schedulerv2mocks.MockScheduler_SyncProbesServerMockRecorder) {
gomock.InOrder(
ms.Recv().Return(&schedulerv2.SyncProbesRequest{
Host: &commonv2.Host{
Id: mockSeedHostID,
Type: uint32(pkgtypes.HostTypeSuperSeed),
Hostname: "bar",
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Os: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
Cpu: mockV2Probe.Host.Cpu,
Memory: mockV2Probe.Host.Memory,
Network: mockV2Probe.Host.Network,
Disk: mockV2Probe.Host.Disk,
Build: mockV2Probe.Host.Build,
},
Request: &schedulerv2.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
},
}, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false),
)
},
expect: func(t *testing.T, err error) {
assert := assert.New(t)
assert.EqualError(err, "rpc error: code = NotFound desc = probed host not found")
},
},
{ {
name: "send synchronize probes response error", name: "send synchronize probes response error",
mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder, mock: func(svc *V2, mr *resource.MockResourceMockRecorder, probes *networktopologymocks.MockProbes, mp *networktopologymocks.MockProbesMockRecorder,
@ -1275,9 +1234,7 @@ func TestServiceV2_SyncProbes(t *testing.T) {
ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{}, ProbeStartedRequest: &schedulerv2.ProbeStartedRequest{},
}, },
}, nil).Times(1), }, nil).Times(1),
mn.FindProbedHostIDs(gomock.Eq(mockRawSeedHost.ID)).Return([]string{mockRawHost.ID}, nil).Times(1), mn.FindProbedHosts(gomock.Eq(mockRawSeedHost.ID)).Return([]*resource.Host{&mockRawHost}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(&mockRawHost, true),
ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{ ms.Send(gomock.Eq(&schedulerv2.SyncProbesResponse{
Hosts: []*commonv2.Host{ Hosts: []*commonv2.Host{
{ {