From 9999ce51b6b3adec11d527fb8443ce3fa2b73f09 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 31 May 2023 18:43:22 +0800 Subject: [PATCH] feat: add ProbedAt to network topology (#2413) Signed-off-by: Gaius --- pkg/redis/redis.go | 8 + .../mocks/network_topology_mock.go | 16 + .../networktopology/mocks/probes_mock.go | 12 +- scheduler/networktopology/network_topology.go | 40 +- .../networktopology/network_topology_test.go | 275 -------- scheduler/networktopology/probes.go | 1 + scheduler/networktopology/probes_test.go | 661 ------------------ 7 files changed, 56 insertions(+), 957 deletions(-) delete mode 100644 scheduler/networktopology/network_topology_test.go delete mode 100644 scheduler/networktopology/probes_test.go diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index b770673ed..4de10814c 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -49,6 +49,9 @@ const ( // ProbedCountNamespace prefix of probed count namespace cache key. ProbedCountNamespace = "probed-count" + + // ProbedAtNamespace prefix of last probed time namespace cache key. + ProbedAtNamespace = "probed-at" ) // NewRedis returns a new redis client. @@ -138,3 +141,8 @@ func MakeProbesKeyInScheduler(srcHostID, destHostID string) string { func MakeProbedCountKeyInScheduler(hostID string) string { return MakeKeyInScheduler(ProbedCountNamespace, hostID) } + +// MakeProbedAtKeyInScheduler make last probed time in scheduler. +func MakeProbedAtKeyInScheduler(hostID string) string { + return MakeKeyInScheduler(ProbedAtNamespace, hostID) +} diff --git a/scheduler/networktopology/mocks/network_topology_mock.go b/scheduler/networktopology/mocks/network_topology_mock.go index a2d41a10e..359cdf195 100644 --- a/scheduler/networktopology/mocks/network_topology_mock.go +++ b/scheduler/networktopology/mocks/network_topology_mock.go @@ -6,6 +6,7 @@ package mocks import ( reflect "reflect" + time "time" networktopology "d7y.io/dragonfly/v2/scheduler/networktopology" gomock "github.com/golang/mock/gomock" @@ -62,6 +63,21 @@ func (mr *MockNetworkTopologyMockRecorder) Has(arg0, arg1 interface{}) *gomock.C return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Has", reflect.TypeOf((*MockNetworkTopology)(nil).Has), arg0, arg1) } +// ProbedAt mocks base method. +func (m *MockNetworkTopology) ProbedAt(arg0 string) (time.Time, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "ProbedAt", arg0) + ret0, _ := ret[0].(time.Time) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// ProbedAt indicates an expected call of ProbedAt. +func (mr *MockNetworkTopologyMockRecorder) ProbedAt(arg0 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ProbedAt", reflect.TypeOf((*MockNetworkTopology)(nil).ProbedAt), arg0) +} + // ProbedCount mocks base method. func (m *MockNetworkTopology) ProbedCount(arg0 string) (uint64, error) { m.ctrl.T.Helper() diff --git a/scheduler/networktopology/mocks/probes_mock.go b/scheduler/networktopology/mocks/probes_mock.go index f5f8d89e2..46a9385e4 100644 --- a/scheduler/networktopology/mocks/probes_mock.go +++ b/scheduler/networktopology/mocks/probes_mock.go @@ -94,19 +94,19 @@ func (mr *MockProbesMockRecorder) Enqueue(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enqueue", reflect.TypeOf((*MockProbes)(nil).Enqueue), arg0) } -// Length mocks base method. -func (m *MockProbes) Length() (int64, error) { +// Len mocks base method. +func (m *MockProbes) Len() (int64, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Length") + ret := m.ctrl.Call(m, "Len") ret0, _ := ret[0].(int64) ret1, _ := ret[1].(error) return ret0, ret1 } -// Length indicates an expected call of Length. -func (mr *MockProbesMockRecorder) Length() *gomock.Call { +// Len indicates an expected call of Len. +func (mr *MockProbesMockRecorder) Len() *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Length", reflect.TypeOf((*MockProbes)(nil).Length)) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Len", reflect.TypeOf((*MockProbes)(nil).Len)) } // Peek mocks base method. diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index 1a53d92ee..76c35a62b 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -44,15 +44,17 @@ type NetworkTopology interface { // Store stores src host and destination host. Store(string, string) error - // Delete deletes src host and destination host. If destination host is *, - // delete all destination hosts. - Delete(string, string) error + // DeleteHost deletes src host and destination hosts of the src host. + DeleteHost(string) error // Probes loads probes interface by source host id and destination host id. Probes(string, string) Probes // ProbedCount is the number of times the host has been probed. ProbedCount(string) (uint64, error) + + // ProbedAt is the time of the last probe. + ProbedAt(string) (time.Time, error) } // networkTopology is an implementation of network topology. @@ -91,17 +93,16 @@ func (nt *networkTopology) Has(srcHostID string, destHostID string) bool { return false } - probesCount, err := nt.rdb.Exists(ctx, pkgredis.MakeProbesKeyInScheduler(srcHostID, destHostID)).Result() - if err != nil { - logger.Errorf("failed to check whether probes exists: %s", err.Error()) - return false - } - - return networkTopologyCount == 1 && probesCount == 1 + return networkTopologyCount == 1 } // Store stores src host and destination host. func (nt *networkTopology) Store(srcHostID string, destHostID string) error { + // If the network topology already exists, skip it. + if nt.Has(srcHostID, destHostID) { + return nil + } + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() @@ -116,15 +117,16 @@ func (nt *networkTopology) Store(srcHostID string, destHostID string) error { return nil } -// Delete deletes src host and destination host. If destination host is *, -// delete all destination hosts. -func (nt *networkTopology) Delete(srcHostID string, destHostID string) error { +// DeleteHost deletes src host and destination hosts of the src host. +func (nt *networkTopology) DeleteHost(hostID string) error { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) defer cancel() if _, err := nt.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error { - pipe.Del(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID)) - pipe.Del(ctx, pkgredis.MakeProbesKeyInScheduler(srcHostID, destHostID)) + pipe.Del(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(hostID, "*")) + pipe.Del(ctx, pkgredis.MakeProbesKeyInScheduler(hostID, "*")) + pipe.Del(ctx, pkgredis.MakeProbedAtKeyInScheduler(hostID)) + pipe.Del(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID)) return nil }); err != nil { return err @@ -141,6 +143,14 @@ func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) { return nt.rdb.Get(ctx, pkgredis.MakeProbedCountKeyInScheduler(hostID)).Uint64() } +// ProbedAt is the time of the last probe. +func (nt *networkTopology) ProbedAt(hostID string) (time.Time, error) { + ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) + defer cancel() + + return nt.rdb.Get(ctx, pkgredis.MakeProbedAtKeyInScheduler(hostID)).Time() +} + // Probes loads probes interface by source host id and destination host id. func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes { return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID) diff --git a/scheduler/networktopology/network_topology_test.go b/scheduler/networktopology/network_topology_test.go deleted file mode 100644 index 49f388204..000000000 --- a/scheduler/networktopology/network_topology_test.go +++ /dev/null @@ -1,275 +0,0 @@ -/* - * Copyright 2023 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package networktopology - -import ( - "errors" - "reflect" - "testing" - "time" - - "github.com/go-redis/redismock/v8" - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - "go.uber.org/atomic" - - "d7y.io/dragonfly/v2/pkg/idgen" - pkgredis "d7y.io/dragonfly/v2/pkg/redis" - "d7y.io/dragonfly/v2/pkg/types" - "d7y.io/dragonfly/v2/scheduler/config" - "d7y.io/dragonfly/v2/scheduler/resource" - storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" -) - -var ( - mockHost = &resource.Host{ - ID: idgen.HostIDV2("127.0.0.1", "HostName"), - Type: types.HostTypeNormal, - Hostname: "hostname", - IP: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - OS: "darwin", - Platform: "darwin", - PlatformFamily: "Standalone Workstation", - PlatformVersion: "11.1", - KernelVersion: "20.2.0", - ConcurrentUploadLimit: atomic.NewInt32(int32(300)), - ConcurrentUploadCount: atomic.NewInt32(0), - UploadCount: atomic.NewInt64(0), - UploadFailedCount: atomic.NewInt64(0), - CPU: mockCPU, - Memory: mockMemory, - Network: mockNetwork, - Disk: mockDisk, - Build: mockBuild, - CreatedAt: atomic.NewTime(time.Now()), - UpdatedAt: atomic.NewTime(time.Now()), - } - - mockSeedHost = &resource.Host{ - ID: idgen.HostIDV2("127.0.0.1", "HostName_seed"), - Type: types.HostTypeSuperSeed, - Hostname: "hostname_seed", - IP: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - OS: "darwin", - Platform: "darwin", - PlatformFamily: "Standalone Workstation", - PlatformVersion: "11.1", - KernelVersion: "20.2.0", - ConcurrentUploadLimit: atomic.NewInt32(int32(300)), - ConcurrentUploadCount: atomic.NewInt32(0), - UploadCount: atomic.NewInt64(0), - UploadFailedCount: atomic.NewInt64(0), - CPU: mockCPU, - Memory: mockMemory, - Network: mockNetwork, - Disk: mockDisk, - Build: mockBuild, - CreatedAt: atomic.NewTime(time.Now()), - UpdatedAt: atomic.NewTime(time.Now()), - } - - mockCPU = resource.CPU{ - LogicalCount: 4, - PhysicalCount: 2, - Percent: 1, - ProcessPercent: 0.5, - Times: resource.CPUTimes{ - User: 240662.2, - System: 317950.1, - Idle: 3393691.3, - Nice: 0, - Iowait: 0, - Irq: 0, - Softirq: 0, - Steal: 0, - Guest: 0, - GuestNice: 0, - }, - } - - mockMemory = resource.Memory{ - Total: 17179869184, - Available: 5962813440, - Used: 11217055744, - UsedPercent: 65.291858, - ProcessUsedPercent: 41.525125, - Free: 2749598908, - } - - mockNetwork = resource.Network{ - TCPConnectionCount: 10, - UploadTCPConnectionCount: 1, - Location: mockHostLocation, - IDC: mockHostIDC, - } - - mockDisk = resource.Disk{ - Total: 499963174912, - Free: 37226479616, - Used: 423809622016, - UsedPercent: 91.92547406065952, - InodesTotal: 4882452880, - InodesUsed: 7835772, - InodesFree: 4874617108, - InodesUsedPercent: 0.1604884305611568, - } - - mockBuild = resource.Build{ - GitVersion: "v1.0.0", - GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", - GoVersion: "1.18", - Platform: "darwin", - } - - mockHostLocation = "location" - mockHostIDC = "idc" - - mockProbe = &Probe{ - Host: mockHost, - RTT: 30 * time.Millisecond, - CreatedAt: time.Now(), - } - - mockNetworkTopologyConfig = config.NetworkTopologyConfig{ - Enable: true, - CollectInterval: 2 * time.Hour, - Probe: config.ProbeConfig{ - QueueLength: 5, - Interval: 15 * time.Minute, - Count: 10, - }, - } -) - -func Test_NewNetworkTopology(t *testing.T) { - tests := []struct { - name string - expect func(t *testing.T, networkTopology NetworkTopology, err error) - }{ - { - name: "new network topology", - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.Equal(reflect.TypeOf(networkTopology).Elem().Name(), "networkTopology") - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, _ := redismock.NewClientMock() - res := resource.NewMockResource(ctl) - storage := storagemocks.NewMockStorage(ctl) - - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) - tc.expect(t, networkTopology, err) - }) - } -} - -func TestNetworkTopology_ProbedCount(t *testing.T) { - tests := []struct { - name string - mock func(mockRDBClient redismock.ClientMock) - expect func(t *testing.T, networkTopology NetworkTopology, err error) - }{ - { - name: "get probed count of host", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectGet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal("1") - }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { - assert := assert.New(t) - assert.NoError(err) - - probedCount, err := networkTopology.ProbedCount(mockHost.ID) - assert.NoError(err) - assert.Equal(probedCount, uint64(1)) - }, - }, - { - name: "get probed count of host error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectGet(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetErr(errors.New("host do not exist")) - }, - expect: func(t *testing.T, networkTopology NetworkTopology, err error) { - assert := assert.New(t) - assert.NoError(err) - - _, err = networkTopology.ProbedCount(mockHost.ID) - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - res := resource.NewMockResource(ctl) - storage := storagemocks.NewMockStorage(ctl) - tc.mock(mockRDBClient) - - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) - tc.expect(t, networkTopology, err) - - mockRDBClient.ClearExpect() - }) - } -} - -func TestNetworkTopology_Probes(t *testing.T) { - tests := []struct { - name string - expect func(t *testing.T, ps Probes) - }{ - { - name: "load probes", - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - assert.Equal(reflect.TypeOf(ps).Elem().Name(), "probes") - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, _ := redismock.NewClientMock() - res := resource.NewMockResource(ctl) - storage := storagemocks.NewMockStorage(ctl) - networkTopology, err := NewNetworkTopology(mockNetworkTopologyConfig, rdb, res, storage) - if err != nil { - t.Fatal(err) - } - - tc.expect(t, networkTopology.Probes(mockSeedHost.ID, mockHost.ID)) - }) - } -} diff --git a/scheduler/networktopology/probes.go b/scheduler/networktopology/probes.go index 5577b0a20..851196b87 100644 --- a/scheduler/networktopology/probes.go +++ b/scheduler/networktopology/probes.go @@ -176,6 +176,7 @@ func (p *probes) Enqueue(probe *Probe) error { if _, err := p.rdb.Pipelined(ctx, func(pipe redis.Pipeliner) error { pipe.HSet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "averageRTT", averageRTT.Nanoseconds()) pipe.HSet(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(p.srcHostID, p.destHostID), "updatedAt", probe.CreatedAt.Format(time.RFC3339Nano)) + pipe.Set(ctx, pkgredis.MakeProbedAtKeyInScheduler(p.destHostID), probe.CreatedAt.Format(time.RFC3339Nano), 0) pipe.Incr(ctx, pkgredis.MakeProbedCountKeyInScheduler(p.destHostID)) return nil }); err != nil { diff --git a/scheduler/networktopology/probes_test.go b/scheduler/networktopology/probes_test.go deleted file mode 100644 index 5fe71544f..000000000 --- a/scheduler/networktopology/probes_test.go +++ /dev/null @@ -1,661 +0,0 @@ -/* - * Copyright 2023 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package networktopology - -import ( - "encoding/json" - "errors" - "strconv" - "testing" - "time" - - "github.com/go-redis/redismock/v8" - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - - pkgredis "d7y.io/dragonfly/v2/pkg/redis" -) - -func Test_NewProbes(t *testing.T) { - tests := []struct { - name string - expect func(t *testing.T, rawProbes Probes) - }{ - { - name: "new probes", - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - probes := ps.(*probes) - assert.Equal(probes.config.QueueLength, 5) - assert.NotNil(probes.rdb) - assert.Equal(probes.srcHostID, mockSeedHost.ID) - assert.Equal(probes.destHostID, mockHost.ID) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - rdb, _ := redismock.NewClientMock() - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - }) - } -} - -func TestProbes_Peek(t *testing.T) { - tests := []struct { - name string - probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) - expect func(t *testing.T, p Probes) - }{ - { - name: "queue has one probe", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - data, err := json.Marshal(mockProbe) - if err != nil { - t.Fatal(err) - } - - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal(string(data)) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - probe, err := ps.Peek() - assert.NoError(err) - assert.Equal(probe.Host.ID, mockProbe.Host.ID) - assert.Equal(probe.Host.Type, mockProbe.Host.Type) - assert.Equal(probe.Host.Hostname, mockProbe.Host.Hostname) - assert.Equal(probe.Host.IP, mockProbe.Host.IP) - assert.Equal(probe.Host.Port, mockProbe.Host.Port) - assert.Equal(probe.Host.DownloadPort, mockProbe.Host.DownloadPort) - assert.Equal(probe.Host.OS, mockProbe.Host.OS) - assert.Equal(probe.Host.Platform, mockProbe.Host.Platform) - assert.Equal(probe.Host.PlatformFamily, mockProbe.Host.PlatformFamily) - assert.Equal(probe.Host.PlatformVersion, mockProbe.Host.PlatformVersion) - assert.Equal(probe.Host.KernelVersion, mockProbe.Host.KernelVersion) - assert.Equal(probe.Host.ConcurrentUploadLimit, mockProbe.Host.ConcurrentUploadLimit) - assert.Equal(probe.Host.ConcurrentUploadCount, mockProbe.Host.ConcurrentUploadCount) - assert.Equal(probe.Host.UploadCount, mockProbe.Host.UploadCount) - assert.Equal(probe.Host.UploadFailedCount, mockProbe.Host.UploadFailedCount) - assert.EqualValues(probe.Host.CPU, mockProbe.Host.CPU) - assert.EqualValues(probe.Host.Memory, mockProbe.Host.Memory) - assert.EqualValues(probe.Host.Network, mockProbe.Host.Network) - assert.EqualValues(probe.Host.Disk, mockProbe.Host.Disk) - assert.EqualValues(probe.Host.Build, mockProbe.Host.Build) - assert.Equal(probe.RTT, mockProbe.RTT) - assert.True(probe.CreatedAt.Equal(mockProbe.CreatedAt)) - }, - }, - { - name: "queue has six probe", - probes: []*Probe{ - { - Host: mockHost, - RTT: 31 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 32 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 33 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 34 * time.Millisecond, - CreatedAt: time.Now(), - }, - mockProbe, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - var rawProbes []string - for _, p := range ps { - data, err := json.Marshal(p) - if err != nil { - t.Fatal(err) - } - - rawProbes = append(rawProbes, string(data)) - } - - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal(rawProbes[4]) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(rawProbes[4]) - mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) - mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) - mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(0) - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetVal(rawProbes[0]) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - probe, err := ps.Peek() - assert.NoError(err) - assert.Equal(probe.RTT, mockProbe.RTT) - assert.NoError(ps.Enqueue(mockProbe)) - - probe, err = ps.Peek() - assert.NoError(err) - assert.Equal(probe.RTT, 31*time.Millisecond) - - }, - }, - { - name: "queue has no probe", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLIndex(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0).SetErr(errors.New("no probe")) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - _, err := ps.Peek() - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) - - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - mockRDBClient.ClearExpect() - }) - } -} - -func TestProbes_Enqueue(t *testing.T) { - tests := []struct { - name string - probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) - expect func(t *testing.T, ps Probes) - }{ - { - name: "enqueue probe when probes queue is empty", - probes: []*Probe{ - mockProbe, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - data, err := json.Marshal(ps[0]) - if err != nil { - t.Fatal(err) - } - - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) - mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), data).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", mockProbe.RTT.Nanoseconds()).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) - mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(0) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - assert.NoError(ps.Enqueue(mockProbe)) - }, - }, - { - name: "enqueue probe when probes queue has one probe", - probes: []*Probe{ - mockProbe, - { - Host: mockHost, - RTT: 31 * time.Millisecond, - CreatedAt: time.Now(), - }, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - var rawProbes []string - for _, p := range ps { - data, err := json.Marshal(p) - if err != nil { - t.Fatal(err) - } - - rawProbes = append(rawProbes, string(data)) - } - - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) - mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[0])).SetVal(1) - mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal([]string{rawProbes[1], rawProbes[0]}) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30100000)).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) - mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(0) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - assert.NoError(ps.Enqueue(mockProbe)) - }, - }, - { - name: "enqueue probe when probes queue has five probes", - probes: []*Probe{ - { - Host: mockHost, - RTT: 31 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 32 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 33 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 34 * time.Millisecond, - CreatedAt: time.Now(), - }, - mockProbe, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - var rawProbes []string - for _, p := range ps { - data, err := json.Marshal(p) - if err != nil { - t.Fatal(err) - } - - rawProbes = append(rawProbes, string(data)) - } - - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(rawProbes[0]) - mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) - mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) - mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(0) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - assert.Nil(ps.Enqueue(mockProbe)) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) - - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - mockRDBClient.ClearExpect() - }) - } -} - -func TestProbes_Dequeue(t *testing.T) { - tests := []struct { - name string - probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) - expect func(t *testing.T, ps Probes) - }{ - { - name: "queue has one probe", - probes: []*Probe{ - mockProbe, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - data, err := json.Marshal(ps[0]) - if err != nil { - t.Fatal(err) - } - - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(data)) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - probe, err := ps.Dequeue() - assert.NoError(err) - assert.Equal(probe.Host.ID, mockProbe.Host.ID) - assert.Equal(probe.Host.Type, mockProbe.Host.Type) - assert.Equal(probe.Host.Hostname, mockProbe.Host.Hostname) - assert.Equal(probe.Host.IP, mockProbe.Host.IP) - assert.Equal(probe.Host.Port, mockProbe.Host.Port) - assert.Equal(probe.Host.DownloadPort, mockProbe.Host.DownloadPort) - assert.Equal(probe.Host.OS, mockProbe.Host.OS) - assert.Equal(probe.Host.Platform, mockProbe.Host.Platform) - assert.Equal(probe.Host.PlatformFamily, mockProbe.Host.PlatformFamily) - assert.Equal(probe.Host.PlatformVersion, mockProbe.Host.PlatformVersion) - assert.Equal(probe.Host.KernelVersion, mockProbe.Host.KernelVersion) - assert.Equal(probe.Host.ConcurrentUploadLimit, mockProbe.Host.ConcurrentUploadLimit) - assert.Equal(probe.Host.ConcurrentUploadCount, mockProbe.Host.ConcurrentUploadCount) - assert.Equal(probe.Host.UploadCount, mockProbe.Host.UploadCount) - assert.Equal(probe.Host.UploadFailedCount, mockProbe.Host.UploadFailedCount) - assert.EqualValues(probe.Host.CPU, mockProbe.Host.CPU) - assert.EqualValues(probe.Host.Memory, mockProbe.Host.Memory) - assert.EqualValues(probe.Host.Network, mockProbe.Host.Network) - assert.EqualValues(probe.Host.Disk, mockProbe.Host.Disk) - assert.EqualValues(probe.Host.Build, mockProbe.Host.Build) - assert.Equal(probe.RTT, mockProbe.RTT) - assert.True(probe.CreatedAt.Equal(mockProbe.CreatedAt)) - }, - }, - { - name: "queue has six probe", - probes: []*Probe{ - { - Host: mockHost, - RTT: 31 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 32 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 33 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 34 * time.Millisecond, - CreatedAt: time.Now(), - }, - mockProbe, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - var rawProbes []string - for _, p := range ps { - data, err := json.Marshal(p) - if err != nil { - t.Fatal(err) - } - - rawProbes = append(rawProbes, string(data)) - } - - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(rawProbes[4])) - mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) - mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) - mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(0) - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(rawProbes[0])) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - assert.NoError(ps.Enqueue(mockProbe)) - - probe, err := ps.Dequeue() - assert.NoError(err) - assert.Equal(probe.RTT, 31*time.Millisecond) - }, - }, - { - name: "dequeue probe from empty probes", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).RedisNil() - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - _, err := ps.Dequeue() - assert.Error(err) - }, - }, - } - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) - - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - mockRDBClient.ClearExpect() - }) - } -} - -func TestProbes_Length(t *testing.T) { - tests := []struct { - name string - probes []*Probe - mock func(mockRDBClient redismock.ClientMock, ps []*Probe) - expect func(t *testing.T, ps Probes) - }{ - { - name: "queue has one probe", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(1) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - length, err := ps.Len() - assert.NoError(err) - assert.Equal(length, int64(1)) - }, - }, - { - name: "queue has six probe", - probes: []*Probe{ - { - Host: mockHost, - RTT: 31 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 32 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 33 * time.Millisecond, - CreatedAt: time.Now(), - }, - { - Host: mockHost, - RTT: 34 * time.Millisecond, - CreatedAt: time.Now(), - }, - mockProbe, - }, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - var rawProbes []string - for _, p := range ps { - data, err := json.Marshal(p) - if err != nil { - t.Fatal(err) - } - - rawProbes = append(rawProbes, string(data)) - } - - mockRDBClient.MatchExpectationsInOrder(true) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - mockRDBClient.ExpectLPop(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(string(rawProbes[4])) - mockRDBClient.ExpectRPush(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), []byte(rawProbes[4])).SetVal(1) - mockRDBClient.ExpectLRange(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID), 0, -1).SetVal(rawProbes) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT", int64(30388900)).SetVal(1) - mockRDBClient.ExpectHSet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt", mockProbe.CreatedAt.Format(time.RFC3339Nano)).SetVal(1) - mockRDBClient.ExpectIncr(pkgredis.MakeProbedCountKeyInScheduler(mockHost.ID)).SetVal(0) - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(5) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - length, err := ps.Len() - assert.NoError(err) - assert.Equal(length, int64(5)) - assert.NoError(ps.Enqueue(mockProbe)) - - length, err = ps.Len() - assert.NoError(err) - assert.Equal(length, int64(5)) - }, - }, - { - name: "queue has no probe", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetVal(0) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - length, err := ps.Len() - assert.NoError(err) - assert.Equal(length, int64(0)) - }, - }, - { - name: "get queue length error", - probes: []*Probe{}, - mock: func(mockRDBClient redismock.ClientMock, ps []*Probe) { - mockRDBClient.ExpectLLen(pkgredis.MakeProbesKeyInScheduler(mockSeedHost.ID, mockHost.ID)).SetErr(errors.New("get queue length error")) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - _, err := ps.Len() - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient, tc.probes) - - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - mockRDBClient.ClearExpect() - }) - } -} - -func TestProbes_UpdatedAt(t *testing.T) { - tests := []struct { - name string - mock func(mockRDBClient redismock.ClientMock) - expect func(t *testing.T, ps Probes) - }{ - { - name: "get update time of probes", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetVal(mockProbe.CreatedAt.Format(time.RFC3339Nano)) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - updatedAt, err := ps.UpdatedAt() - assert.NoError(err) - assert.True(updatedAt.Equal(mockProbe.CreatedAt)) - }, - }, - { - name: "get update time of probes error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "updatedAt").SetErr(errors.New("get update time of probes error")) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - _, err := ps.UpdatedAt() - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient) - - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - mockRDBClient.ClearExpect() - }) - } -} - -func TestProbes_AverageRTT(t *testing.T) { - tests := []struct { - name string - mock func(mockRDBClient redismock.ClientMock) - expect func(t *testing.T, ps Probes) - }{ - { - name: "get averageRTT of probes", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetVal(strconv.FormatInt(mockProbe.RTT.Nanoseconds(), 10)) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - averageRTT, err := ps.AverageRTT() - assert.NoError(err) - assert.Equal(averageRTT, mockProbe.RTT) - }, - }, - { - name: "get averageRTT of probes error", - mock: func(mockRDBClient redismock.ClientMock) { - mockRDBClient.ExpectHGet(pkgredis.MakeNetworkTopologyKeyInScheduler(mockSeedHost.ID, mockHost.ID), "averageRTT").SetErr(errors.New("get averageRTT of probes error")) - }, - expect: func(t *testing.T, ps Probes) { - assert := assert.New(t) - _, err := ps.AverageRTT() - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - rdb, mockRDBClient := redismock.NewClientMock() - tc.mock(mockRDBClient) - - tc.expect(t, NewProbes(mockNetworkTopologyConfig.Probe, rdb, mockSeedHost.ID, mockHost.ID)) - mockRDBClient.ClearExpect() - }) - } -}