From a0d14c58b8736d30aa514303eec6b7baadafc4b3 Mon Sep 17 00:00:00 2001 From: Gaius Date: Tue, 27 Jun 2023 18:57:19 +0800 Subject: [PATCH] feat: add network topology to daemon (#2489) Signed-off-by: Gaius --- client/daemon/daemon.go | 31 +- .../mocks/network_topology_mock.go | 58 ++ .../networktopology/network_topology.go | 203 +++++++ client/daemon/probe/mocks/probe_mock.go | 62 --- client/daemon/probe/probe.go | 199 ------- client/daemon/probe/probe_test.go | 505 ------------------ 6 files changed, 278 insertions(+), 780 deletions(-) create mode 100644 client/daemon/networktopology/mocks/network_topology_mock.go create mode 100644 client/daemon/networktopology/network_topology.go delete mode 100644 client/daemon/probe/mocks/probe_mock.go delete mode 100644 client/daemon/probe/probe.go delete mode 100644 client/daemon/probe/probe_test.go diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index e6f16336e..fd39c8e2b 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -47,9 +47,9 @@ import ( "d7y.io/dragonfly/v2/client/daemon/announcer" "d7y.io/dragonfly/v2/client/daemon/gc" "d7y.io/dragonfly/v2/client/daemon/metrics" + "d7y.io/dragonfly/v2/client/daemon/networktopology" "d7y.io/dragonfly/v2/client/daemon/objectstorage" "d7y.io/dragonfly/v2/client/daemon/peer" - "d7y.io/dragonfly/v2/client/daemon/probe" "d7y.io/dragonfly/v2/client/daemon/proxy" "d7y.io/dragonfly/v2/client/daemon/rpcserver" "d7y.io/dragonfly/v2/client/daemon/storage" @@ -108,7 +108,7 @@ type clientDaemon struct { schedulerClient schedulerclient.V1 certifyClient *certify.Certify announcer announcer.Announcer - probe probe.Probe + networkTopology networktopology.NetworkTopology } func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { @@ -686,6 +686,19 @@ func (cd *clientDaemon) Serve() error { } }() + // serve network topology + if cd.Option.NetworkTopology.Enable { + cd.networkTopology, err = networktopology.NewNetworkTopology(&cd.Option, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, cd.schedPeerHost.DownPort, cd.schedulerClient) + if err != nil { + logger.Errorf("failed to create network topology: %v", err) + return err + } + + // serve network topology service + logger.Infof("serve network topology") + go cd.networkTopology.Serve() + } + if cd.Option.AliveTime.Duration > 0 { g.Go(func() error { for { @@ -783,17 +796,6 @@ func (cd *clientDaemon) Serve() error { }() } - if cd.Option.NetworkTopology.Enable { - cd.probe, err = probe.NewProbe(&cd.Option, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, cd.schedPeerHost.DownPort, cd.schedulerClient) - if err != nil { - return err - } - - // serve dynconfig service - logger.Infof("probe serve start") - go cd.probe.Serve() - } - werr := g.Wait() cd.Stop() return werr @@ -819,7 +821,6 @@ func (cd *clientDaemon) Stop() { cd.GCManager.Stop() cd.RPCManager.Stop() - cd.probe.Stop() if err := cd.UploadManager.Stop(); err != nil { logger.Errorf("upload manager stop failed %s", err) } @@ -849,6 +850,8 @@ func (cd *clientDaemon) Stop() { logger.Errorf("announcer stop failed %s", err) } + cd.networkTopology.Stop() + if err := cd.dynconfig.Stop(); err != nil { logger.Errorf("dynconfig client closed failed %s", err) } else { diff --git a/client/daemon/networktopology/mocks/network_topology_mock.go b/client/daemon/networktopology/mocks/network_topology_mock.go new file mode 100644 index 000000000..1e4f12f18 --- /dev/null +++ b/client/daemon/networktopology/mocks/network_topology_mock.go @@ -0,0 +1,58 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: network_topology.go + +// Package mocks is a generated GoMock package. +package mocks + +import ( + reflect "reflect" + + gomock "github.com/golang/mock/gomock" +) + +// MockNetworkTopology is a mock of NetworkTopology interface. +type MockNetworkTopology struct { + ctrl *gomock.Controller + recorder *MockNetworkTopologyMockRecorder +} + +// MockNetworkTopologyMockRecorder is the mock recorder for MockNetworkTopology. +type MockNetworkTopologyMockRecorder struct { + mock *MockNetworkTopology +} + +// NewMockNetworkTopology creates a new mock instance. +func NewMockNetworkTopology(ctrl *gomock.Controller) *MockNetworkTopology { + mock := &MockNetworkTopology{ctrl: ctrl} + mock.recorder = &MockNetworkTopologyMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockNetworkTopology) EXPECT() *MockNetworkTopologyMockRecorder { + return m.recorder +} + +// Serve mocks base method. +func (m *MockNetworkTopology) Serve() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Serve") +} + +// Serve indicates an expected call of Serve. +func (mr *MockNetworkTopologyMockRecorder) Serve() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockNetworkTopology)(nil).Serve)) +} + +// Stop mocks base method. +func (m *MockNetworkTopology) Stop() { + m.ctrl.T.Helper() + m.ctrl.Call(m, "Stop") +} + +// Stop indicates an expected call of Stop. +func (mr *MockNetworkTopologyMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockNetworkTopology)(nil).Stop)) +} diff --git a/client/daemon/networktopology/network_topology.go b/client/daemon/networktopology/network_topology.go new file mode 100644 index 000000000..af007d533 --- /dev/null +++ b/client/daemon/networktopology/network_topology.go @@ -0,0 +1,203 @@ +/* + * Copyright 2023 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +//go:generate mockgen -destination mocks/network_topology_mock.go -source network_topology.go -package mocks + +package networktopology + +import ( + "context" + "io" + "sync" + "time" + + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" + + v1 "d7y.io/api/pkg/apis/common/v1" + schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" + + "d7y.io/dragonfly/v2/client/config" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/net/ping" + schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" +) + +type NetworkTopology interface { + // Serve starts network topology server. + Serve() + + // Stop stops network topology server. + Stop() +} + +// networkTopology implements NetworkTopology. +type networkTopology struct { + config *config.DaemonOption + hostID string + daemonPort int32 + daemonDownloadPort int32 + schedulerClient schedulerclient.V1 + done chan struct{} +} + +// NewNetworkTopology returns a new NetworkTopology interface. +func NewNetworkTopology(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32, + schedulerClient schedulerclient.V1) (NetworkTopology, error) { + return &networkTopology{ + config: cfg, + hostID: hostID, + daemonPort: daemonPort, + daemonDownloadPort: daemonDownloadPort, + schedulerClient: schedulerClient, + done: make(chan struct{}), + }, nil +} + +// Serve starts network topology server. +func (nt *networkTopology) Serve() { + tick := time.NewTicker(nt.config.NetworkTopology.Probe.Interval) + for { + select { + case <-tick.C: + if err := nt.syncProbes(); err != nil { + logger.Error(err) + } + case <-nt.done: + return + } + } +} + +// Stop stops network topology server. +func (nt *networkTopology) Stop() { + close(nt.done) +} + +// syncProbes syncs probes to scheduler. +func (nt *networkTopology) syncProbes() error { + host := &v1.Host{ + Id: nt.hostID, + Ip: nt.config.Host.AdvertiseIP.String(), + Hostname: nt.config.Host.Hostname, + Port: nt.daemonPort, + DownloadPort: nt.daemonDownloadPort, + Location: nt.config.Host.Location, + Idc: nt.config.Host.IDC, + } + + stream, err := nt.schedulerClient.SyncProbes(context.Background(), &schedulerv1.SyncProbesRequest{ + Host: host, + Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ + ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, + }, + }) + if err != nil { + return err + } + + resp, err := stream.Recv() + if err != nil { + if err == io.EOF { + return nil + } + + return err + } + + // Ping the destination host with the ICMP protocol. + probes, failedProbes := nt.pingHosts(resp.Hosts) + if len(probes) > 0 { + if err := stream.Send(&schedulerv1.SyncProbesRequest{ + Host: host, + Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{ + ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{ + Probes: probes, + }, + }, + }); err != nil { + return err + } + } + + if len(failedProbes) > 0 { + if err := stream.Send(&schedulerv1.SyncProbesRequest{ + Host: host, + Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{ + ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{ + Probes: failedProbes, + }, + }, + }); err != nil { + return err + } + } + + return nil +} + +// Ping the destination host with the ICMP protocol. If the host is unreachable, +// we will send the failed probe result to the scheduler. If the host is reachable, +// we will send the probe result to the scheduler. +func (nt *networkTopology) pingHosts(destHosts []*v1.Host) ([]*schedulerv1.Probe, []*schedulerv1.FailedProbe) { + var ( + probes []*schedulerv1.Probe + failedProbes []*schedulerv1.FailedProbe + ) + + wg := &sync.WaitGroup{} + wg.Add(len(destHosts)) + for _, destHost := range destHosts { + go func(destHost *v1.Host) { + defer wg.Done() + + stats, err := ping.Ping(destHost.Ip) + if err != nil { + failedProbes = append(failedProbes, &schedulerv1.FailedProbe{ + Host: &v1.Host{ + Id: destHost.Id, + Ip: destHost.Ip, + Hostname: destHost.Hostname, + Port: destHost.Port, + DownloadPort: destHost.DownloadPort, + Location: destHost.Location, + Idc: destHost.Idc, + }, + Description: err.Error(), + }) + + return + } + + probes = append(probes, &schedulerv1.Probe{ + Host: &v1.Host{ + Id: destHost.Id, + Ip: destHost.Ip, + Hostname: destHost.Hostname, + Port: destHost.Port, + DownloadPort: destHost.DownloadPort, + Location: destHost.Location, + Idc: destHost.Idc, + }, + Rtt: durationpb.New(stats.AvgRtt), + CreatedAt: timestamppb.New(time.Now()), + }) + }(destHost) + } + + wg.Wait() + return probes, failedProbes +} diff --git a/client/daemon/probe/mocks/probe_mock.go b/client/daemon/probe/mocks/probe_mock.go deleted file mode 100644 index 86c339e81..000000000 --- a/client/daemon/probe/mocks/probe_mock.go +++ /dev/null @@ -1,62 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: probe.go - -// Package mocks is a generated GoMock package. -package mocks - -import ( - reflect "reflect" - - gomock "github.com/golang/mock/gomock" -) - -// MockProbe is a mock of Probe interface. -type MockProbe struct { - ctrl *gomock.Controller - recorder *MockProbeMockRecorder -} - -// MockProbeMockRecorder is the mock recorder for MockProbe. -type MockProbeMockRecorder struct { - mock *MockProbe -} - -// NewMockProbe creates a new mock instance. -func NewMockProbe(ctrl *gomock.Controller) *MockProbe { - mock := &MockProbe{ctrl: ctrl} - mock.recorder = &MockProbeMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockProbe) EXPECT() *MockProbeMockRecorder { - return m.recorder -} - -// Serve mocks base method. -func (m *MockProbe) Serve() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Serve") - ret0, _ := ret[0].(error) - return ret0 -} - -// Serve indicates an expected call of Serve. -func (mr *MockProbeMockRecorder) Serve() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockProbe)(nil).Serve)) -} - -// Stop mocks base method. -func (m *MockProbe) Stop() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Stop") - ret0, _ := ret[0].(error) - return ret0 -} - -// Stop indicates an expected call of Stop. -func (mr *MockProbeMockRecorder) Stop() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockProbe)(nil).Stop)) -} diff --git a/client/daemon/probe/probe.go b/client/daemon/probe/probe.go deleted file mode 100644 index 3d9253bcd..000000000 --- a/client/daemon/probe/probe.go +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Copyright 2023 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//go:generate mockgen -destination mocks/probe_mock.go -source probe.go -package mocks - -package probe - -import ( - "context" - "io" - "sync" - "time" - - "google.golang.org/protobuf/types/known/durationpb" - "google.golang.org/protobuf/types/known/timestamppb" - - v1 "d7y.io/api/pkg/apis/common/v1" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" - - "d7y.io/dragonfly/v2/client/config" - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/net/ping" - schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" -) - -type Probe interface { - // Serve starts probe server. - Serve() - - // Stop stops probe server. - Stop() -} - -type probe struct { - config *config.DaemonOption - hostID string - daemonPort int32 - daemonDownloadPort int32 - schedulerClient schedulerclient.V1 - done chan struct{} -} - -// NewProbe returns a new Probe interface. -func NewProbe(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32, - schedulerClient schedulerclient.V1) (Probe, error) { - return &probe{ - config: cfg, - hostID: hostID, - daemonPort: daemonPort, - daemonDownloadPort: daemonDownloadPort, - schedulerClient: schedulerClient, - done: make(chan struct{}), - }, nil -} - -// Serve starts probe server. -func (p *probe) Serve() { - logger.Info("collect probes and upload probes to scheduler") - p.uploadProbesToScheduler() -} - -// Stop stops probe server. -func (p *probe) Stop() { - close(p.done) -} - -// uploadProbesToScheduler collects probes and uploads probes to scheduler. -func (p *probe) uploadProbesToScheduler() { - host := &v1.Host{ - Id: p.hostID, - Ip: p.config.Host.AdvertiseIP.String(), - Hostname: p.config.Host.Hostname, - Port: p.daemonPort, - DownloadPort: p.daemonDownloadPort, - Location: p.config.Host.Location, - Idc: p.config.Host.IDC, - } - - tick := time.NewTicker(p.config.NetworkTopology.Probe.Interval) - for { - select { - case <-tick.C: - stream, err := p.schedulerClient.SyncProbes(context.Background(), &schedulerv1.SyncProbesRequest{ - Host: host, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }, - }) - if err != nil { - continue - } - - syncProbesResponse, err := stream.Recv() - if err != nil { - if err == io.EOF { - logger.Info("remote SyncProbe done, exit receiving") - return - } - - logger.Errorf("receive error: %s", err.Error()) - continue - } - - logger.Infof("colloect probes from: %#v", syncProbesResponse.Hosts) - probes, failedProbes := p.collectProbes(syncProbesResponse.Hosts) - if len(probes) > 0 { - if err := stream.Send(&schedulerv1.SyncProbesRequest{ - Host: host, - Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{ - ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{ - Probes: probes, - }, - }, - }); err != nil { - logger.Errorf("synchronize finished probe: %w", err) - } - } - - if len(failedProbes) > 0 { - if err := stream.Send(&schedulerv1.SyncProbesRequest{ - Host: host, - Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{ - ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{ - Probes: failedProbes, - }, - }, - }); err != nil { - logger.Errorf("synchronize failed probe: %w", err) - } - } - case <-p.done: - return - } - } -} - -// collectProbes probes hosts, collects probes and failed probes. -func (p *probe) collectProbes(desthosts []*v1.Host) ([]*schedulerv1.Probe, []*schedulerv1.FailedProbe) { - var ( - probes []*schedulerv1.Probe - failedProbes []*schedulerv1.FailedProbe - ) - - wg := &sync.WaitGroup{} - wg.Add(len(desthosts)) - for _, desthost := range desthosts { - go func(desthost *v1.Host) { - defer wg.Done() - - statistics, err := ping.Ping(desthost.Ip) - if err != nil { - failedProbes = append(failedProbes, &schedulerv1.FailedProbe{ - Host: &v1.Host{ - Id: desthost.Id, - Ip: desthost.Ip, - Hostname: desthost.Hostname, - Port: desthost.Port, - DownloadPort: desthost.DownloadPort, - Location: desthost.Location, - Idc: desthost.Idc, - }, - Description: err.Error(), - }) - - return - } - - probes = append(probes, &schedulerv1.Probe{ - Host: &v1.Host{ - Id: desthost.Id, - Ip: desthost.Ip, - Hostname: desthost.Hostname, - Port: desthost.Port, - DownloadPort: desthost.DownloadPort, - Location: desthost.Location, - Idc: desthost.Idc, - }, - Rtt: durationpb.New(statistics.AvgRtt), - CreatedAt: timestamppb.New(time.Now()), - }) - }(desthost) - } - - wg.Wait() - return probes, failedProbes -} diff --git a/client/daemon/probe/probe_test.go b/client/daemon/probe/probe_test.go deleted file mode 100644 index e4c56b2ed..000000000 --- a/client/daemon/probe/probe_test.go +++ /dev/null @@ -1,505 +0,0 @@ -/* - * Copyright 2023 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package probe - -import ( - "context" - "errors" - "io" - "net" - "reflect" - "testing" - "time" - - "github.com/golang/mock/gomock" - "github.com/stretchr/testify/assert" - - v1 "d7y.io/api/pkg/apis/common/v1" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" - schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" - - "d7y.io/dragonfly/v2/client/config" - "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks" - schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks" -) - -var ( - mockDaemonConfig = &config.DaemonOption{ - Host: config.HostOption{ - Location: mockHostLocation, - IDC: mockHostIDC, - Hostname: idgen.HostIDV2("127.0.0.1", "bar"), - AdvertiseIP: net.IPv4(127, 0, 0, 1), - }, - NetworkTopology: config.NetworkTopologyOption{ - Enable: true, - Probe: config.ProbeOption{ - Interval: 2 * time.Second, - }, - }, - } - - mockPort = 8000 - mockDownloadPort = 8001 - mockHostLocation = "location" - mockHostIDC = "idc" - - mockHost = &v1.Host{ - Id: "foo", - Ip: "127.0.0.1", - Hostname: idgen.HostIDV2("127.0.0.1", "foo"), - Port: int32(mockPort), - DownloadPort: int32(mockDownloadPort), - Location: mockHostLocation, - Idc: mockHostIDC, - } - - mockSeedHost = &v1.Host{ - Id: "bar", - Ip: "127.0.0.1", - Hostname: idgen.HostIDV2("127.0.0.1", "bar"), - Port: int32(mockPort), - DownloadPort: int32(mockDownloadPort), - Location: mockHostLocation, - Idc: mockHostIDC, - } -) - -func Test_NewProbe(t *testing.T) { - tests := []struct { - name string - expect func(t *testing.T, probe Probe, err error) - }{ - { - name: "new probe", - expect: func(t *testing.T, probe Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.Equal(reflect.TypeOf(probe).Elem().Name(), "probe") - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - schedulerClient := schedulerclientmocks.NewMockV1(ctl) - probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient) - tc.expect(t, probe, err) - }) - } -} - -func TestProbe_Serve(t *testing.T) { - tests := []struct { - name string - interval time.Duration - sleep func() - mock func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) - expect func(t *testing.T, p Probe, err error) - }{ - { - name: "collect probes and upload probes to scheduler", - interval: 200 * time.Millisecond, - sleep: func() { - time.Sleep(300 * time.Millisecond) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(&schedulerv1.SyncProbesResponse{ - Hosts: []*v1.Host{mockHost}, - }, nil).Times(1), - ms.Send(gomock.Any()).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.Serve() - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - schedulerClient := schedulerclientmocks.NewMockV1(ctl) - stream := schedulerv1mocks.NewMockScheduler_SyncProbesClient(ctl) - tc.mock(schedulerClient.EXPECT(), stream, stream.EXPECT()) - mockDaemonConfig.NetworkTopology.Probe.Interval = tc.interval - probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient) - tc.expect(t, probe, err) - tc.sleep() - probe.Stop() - }) - } -} - -func TestProbe_uploadProbesToScheduler(t *testing.T) { - tests := []struct { - name string - interval time.Duration - sleep func() - mock func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) - expect func(t *testing.T, p Probe, err error) - }{ - { - name: "collect probes and send ProbeFinishedRequest", - interval: 200 * time.Millisecond, - sleep: func() { - time.Sleep(300 * time.Millisecond) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(&schedulerv1.SyncProbesResponse{ - Hosts: []*v1.Host{mockHost}, - }, nil).Times(1), - ms.Send(gomock.Any()).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - { - name: "collect fail probes and send ProbeFailedRequest", - interval: 3 * time.Second, - sleep: func() { - time.Sleep(5 * time.Second) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(&schedulerv1.SyncProbesResponse{ - Hosts: []*v1.Host{ - { - Id: idgen.HostIDV2("172.0.0.1", "foo"), - Ip: "172.0.0.1", - Hostname: "foo", - Port: 8003, - DownloadPort: 8001, - Location: "location", - Idc: "idc", - }, - }, - }, nil).Times(1), - ms.Send(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{ - ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{ - Probes: []*schedulerv1.FailedProbe{ - { - Host: &v1.Host{ - Id: idgen.HostIDV2("172.0.0.1", "foo"), - Ip: "172.0.0.1", - Hostname: "foo", - Port: 8003, - DownloadPort: 8001, - Location: "location", - Idc: "idc", - }, - Description: "receive packet failed", - }, - }, - }, - }, - }).Return(nil).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - { - name: "syncProbe error", - interval: 200 * time.Millisecond, - sleep: func() { - time.Sleep(300 * time.Millisecond) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(nil, errors.New("syncProbe error")).Times(1) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - { - name: "receive error", - interval: 200 * time.Millisecond, - sleep: func() { - time.Sleep(300 * time.Millisecond) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(nil, errors.New("receive error")).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - { - name: "receive EOF", - interval: 200 * time.Millisecond, - sleep: func() { - time.Sleep(300 * time.Millisecond) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(nil, io.EOF).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - { - name: "send ProbeFinishedRequest error", - interval: 200 * time.Millisecond, - sleep: func() { - time.Sleep(300 * time.Millisecond) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(&schedulerv1.SyncProbesResponse{ - Hosts: []*v1.Host{mockHost}, - }, nil).Times(1), - ms.Send(gomock.Any()).Return(errors.New("send ProbeFinishedRequest error")).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - { - name: "send ProbeFailedRequest error", - interval: 3 * time.Second, - sleep: func() { - time.Sleep(5 * time.Second) - }, - mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient, - ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) { - gomock.InOrder( - mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{ - ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{}, - }})).Return(stream, nil).Times(1), - ms.Recv().Return(&schedulerv1.SyncProbesResponse{ - Hosts: []*v1.Host{ - { - Id: idgen.HostIDV2("172.0.0.1", "foo"), - Ip: "172.0.0.1", - Hostname: "foo", - Port: 8003, - DownloadPort: 8001, - Location: "location", - Idc: "idc", - }, - }, - }, nil).Times(1), - ms.Send(&schedulerv1.SyncProbesRequest{ - Host: mockSeedHost, - Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{ - ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{ - Probes: []*schedulerv1.FailedProbe{ - { - Host: &v1.Host{ - Id: idgen.HostIDV2("172.0.0.1", "foo"), - Ip: "172.0.0.1", - Hostname: "foo", - Port: 8003, - DownloadPort: 8001, - Location: "location", - Idc: "idc", - }, - Description: "receive packet failed", - }, - }, - }, - }, - }).Return(errors.New("send ProbeFailedRequest error")).Times(1), - ) - }, - expect: func(t *testing.T, p Probe, err error) { - assert := assert.New(t) - assert.NoError(err) - go p.(*probe).uploadProbesToScheduler() - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - schedulerClient := schedulerclientmocks.NewMockV1(ctl) - stream := schedulerv1mocks.NewMockScheduler_SyncProbesClient(ctl) - tc.mock(schedulerClient.EXPECT(), stream, stream.EXPECT()) - mockDaemonConfig.NetworkTopology.Probe.Interval = tc.interval - probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient) - tc.expect(t, probe, err) - tc.sleep() - probe.Stop() - }) - } -} - -func TestProbe_collectProbes(t *testing.T) { - tests := []struct { - name string - destHosts []*v1.Host - expect func(t *testing.T, p Probe, err error, destHosts []*v1.Host) - }{ - { - name: "collect probes", - destHosts: []*v1.Host{mockHost}, - expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) { - assert := assert.New(t) - assert.NoError(err) - probes, failProbes := p.(*probe).collectProbes(destHosts) - assert.Equal(len(probes), 1) - assert.Equal(len(failProbes), 0) - }, - }, - { - name: "collect fail probes", - destHosts: []*v1.Host{ - { - Id: idgen.HostIDV2("172.0.0.1", "foo"), - Ip: "172.0.0.1", - Hostname: "foo", - Port: 8003, - DownloadPort: 8001, - Location: "location", - Idc: "idc", - }, - }, - expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) { - assert := assert.New(t) - assert.NoError(err) - probes, failProbes := p.(*probe).collectProbes(destHosts) - assert.Equal(len(probes), 0) - assert.Equal(len(failProbes), 1) - }, - }, - { - name: "collect probes and fail probes", - destHosts: []*v1.Host{ - mockHost, - { - Id: idgen.HostIDV2("172.0.0.1", "foo"), - Ip: "172.0.0.1", - Hostname: "foo", - Port: 8003, - DownloadPort: 8001, - Location: "location", - Idc: "idc", - }, - }, - expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) { - assert := assert.New(t) - assert.NoError(err) - probes, failProbes := p.(*probe).collectProbes(destHosts) - assert.Equal(len(probes), 1) - assert.Equal(len(failProbes), 1) - }, - }, - { - name: "dest hosts is empty", - destHosts: []*v1.Host{}, - expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) { - assert := assert.New(t) - assert.NoError(err) - probes, failProbes := p.(*probe).collectProbes(destHosts) - assert.Equal(len(probes), 0) - assert.Equal(len(failProbes), 0) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - - schedulerClient := schedulerclientmocks.NewMockV1(ctl) - probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient) - tc.expect(t, probe, err, tc.destHosts) - }) - } -}