feat: add network topology to daemon (#2489)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-06-27 18:57:19 +08:00
parent 883496cb14
commit a0d14c58b8
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 278 additions and 780 deletions

View File

@ -47,9 +47,9 @@ import (
"d7y.io/dragonfly/v2/client/daemon/announcer" "d7y.io/dragonfly/v2/client/daemon/announcer"
"d7y.io/dragonfly/v2/client/daemon/gc" "d7y.io/dragonfly/v2/client/daemon/gc"
"d7y.io/dragonfly/v2/client/daemon/metrics" "d7y.io/dragonfly/v2/client/daemon/metrics"
"d7y.io/dragonfly/v2/client/daemon/networktopology"
"d7y.io/dragonfly/v2/client/daemon/objectstorage" "d7y.io/dragonfly/v2/client/daemon/objectstorage"
"d7y.io/dragonfly/v2/client/daemon/peer" "d7y.io/dragonfly/v2/client/daemon/peer"
"d7y.io/dragonfly/v2/client/daemon/probe"
"d7y.io/dragonfly/v2/client/daemon/proxy" "d7y.io/dragonfly/v2/client/daemon/proxy"
"d7y.io/dragonfly/v2/client/daemon/rpcserver" "d7y.io/dragonfly/v2/client/daemon/rpcserver"
"d7y.io/dragonfly/v2/client/daemon/storage" "d7y.io/dragonfly/v2/client/daemon/storage"
@ -108,7 +108,7 @@ type clientDaemon struct {
schedulerClient schedulerclient.V1 schedulerClient schedulerclient.V1
certifyClient *certify.Certify certifyClient *certify.Certify
announcer announcer.Announcer announcer announcer.Announcer
probe probe.Probe networkTopology networktopology.NetworkTopology
} }
func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
@ -686,6 +686,19 @@ func (cd *clientDaemon) Serve() error {
} }
}() }()
// serve network topology
if cd.Option.NetworkTopology.Enable {
cd.networkTopology, err = networktopology.NewNetworkTopology(&cd.Option, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, cd.schedPeerHost.DownPort, cd.schedulerClient)
if err != nil {
logger.Errorf("failed to create network topology: %v", err)
return err
}
// serve network topology service
logger.Infof("serve network topology")
go cd.networkTopology.Serve()
}
if cd.Option.AliveTime.Duration > 0 { if cd.Option.AliveTime.Duration > 0 {
g.Go(func() error { g.Go(func() error {
for { for {
@ -783,17 +796,6 @@ func (cd *clientDaemon) Serve() error {
}() }()
} }
if cd.Option.NetworkTopology.Enable {
cd.probe, err = probe.NewProbe(&cd.Option, cd.schedPeerHost.Id, cd.schedPeerHost.RpcPort, cd.schedPeerHost.DownPort, cd.schedulerClient)
if err != nil {
return err
}
// serve dynconfig service
logger.Infof("probe serve start")
go cd.probe.Serve()
}
werr := g.Wait() werr := g.Wait()
cd.Stop() cd.Stop()
return werr return werr
@ -819,7 +821,6 @@ func (cd *clientDaemon) Stop() {
cd.GCManager.Stop() cd.GCManager.Stop()
cd.RPCManager.Stop() cd.RPCManager.Stop()
cd.probe.Stop()
if err := cd.UploadManager.Stop(); err != nil { if err := cd.UploadManager.Stop(); err != nil {
logger.Errorf("upload manager stop failed %s", err) logger.Errorf("upload manager stop failed %s", err)
} }
@ -849,6 +850,8 @@ func (cd *clientDaemon) Stop() {
logger.Errorf("announcer stop failed %s", err) logger.Errorf("announcer stop failed %s", err)
} }
cd.networkTopology.Stop()
if err := cd.dynconfig.Stop(); err != nil { if err := cd.dynconfig.Stop(); err != nil {
logger.Errorf("dynconfig client closed failed %s", err) logger.Errorf("dynconfig client closed failed %s", err)
} else { } else {

View File

@ -0,0 +1,58 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: network_topology.go
// Package mocks is a generated GoMock package.
package mocks
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
)
// MockNetworkTopology is a mock of NetworkTopology interface.
type MockNetworkTopology struct {
ctrl *gomock.Controller
recorder *MockNetworkTopologyMockRecorder
}
// MockNetworkTopologyMockRecorder is the mock recorder for MockNetworkTopology.
type MockNetworkTopologyMockRecorder struct {
mock *MockNetworkTopology
}
// NewMockNetworkTopology creates a new mock instance.
func NewMockNetworkTopology(ctrl *gomock.Controller) *MockNetworkTopology {
mock := &MockNetworkTopology{ctrl: ctrl}
mock.recorder = &MockNetworkTopologyMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockNetworkTopology) EXPECT() *MockNetworkTopologyMockRecorder {
return m.recorder
}
// Serve mocks base method.
func (m *MockNetworkTopology) Serve() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Serve")
}
// Serve indicates an expected call of Serve.
func (mr *MockNetworkTopologyMockRecorder) Serve() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockNetworkTopology)(nil).Serve))
}
// Stop mocks base method.
func (m *MockNetworkTopology) Stop() {
m.ctrl.T.Helper()
m.ctrl.Call(m, "Stop")
}
// Stop indicates an expected call of Stop.
func (mr *MockNetworkTopologyMockRecorder) Stop() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockNetworkTopology)(nil).Stop))
}

View File

@ -0,0 +1,203 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/network_topology_mock.go -source network_topology.go -package mocks
package networktopology
import (
"context"
"io"
"sync"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
v1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/dragonfly/v2/client/config"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/net/ping"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)
type NetworkTopology interface {
// Serve starts network topology server.
Serve()
// Stop stops network topology server.
Stop()
}
// networkTopology implements NetworkTopology.
type networkTopology struct {
config *config.DaemonOption
hostID string
daemonPort int32
daemonDownloadPort int32
schedulerClient schedulerclient.V1
done chan struct{}
}
// NewNetworkTopology returns a new NetworkTopology interface.
func NewNetworkTopology(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32,
schedulerClient schedulerclient.V1) (NetworkTopology, error) {
return &networkTopology{
config: cfg,
hostID: hostID,
daemonPort: daemonPort,
daemonDownloadPort: daemonDownloadPort,
schedulerClient: schedulerClient,
done: make(chan struct{}),
}, nil
}
// Serve starts network topology server.
func (nt *networkTopology) Serve() {
tick := time.NewTicker(nt.config.NetworkTopology.Probe.Interval)
for {
select {
case <-tick.C:
if err := nt.syncProbes(); err != nil {
logger.Error(err)
}
case <-nt.done:
return
}
}
}
// Stop stops network topology server.
func (nt *networkTopology) Stop() {
close(nt.done)
}
// syncProbes syncs probes to scheduler.
func (nt *networkTopology) syncProbes() error {
host := &v1.Host{
Id: nt.hostID,
Ip: nt.config.Host.AdvertiseIP.String(),
Hostname: nt.config.Host.Hostname,
Port: nt.daemonPort,
DownloadPort: nt.daemonDownloadPort,
Location: nt.config.Host.Location,
Idc: nt.config.Host.IDC,
}
stream, err := nt.schedulerClient.SyncProbes(context.Background(), &schedulerv1.SyncProbesRequest{
Host: host,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
})
if err != nil {
return err
}
resp, err := stream.Recv()
if err != nil {
if err == io.EOF {
return nil
}
return err
}
// Ping the destination host with the ICMP protocol.
probes, failedProbes := nt.pingHosts(resp.Hosts)
if len(probes) > 0 {
if err := stream.Send(&schedulerv1.SyncProbesRequest{
Host: host,
Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
Probes: probes,
},
},
}); err != nil {
return err
}
}
if len(failedProbes) > 0 {
if err := stream.Send(&schedulerv1.SyncProbesRequest{
Host: host,
Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
Probes: failedProbes,
},
},
}); err != nil {
return err
}
}
return nil
}
// Ping the destination host with the ICMP protocol. If the host is unreachable,
// we will send the failed probe result to the scheduler. If the host is reachable,
// we will send the probe result to the scheduler.
func (nt *networkTopology) pingHosts(destHosts []*v1.Host) ([]*schedulerv1.Probe, []*schedulerv1.FailedProbe) {
var (
probes []*schedulerv1.Probe
failedProbes []*schedulerv1.FailedProbe
)
wg := &sync.WaitGroup{}
wg.Add(len(destHosts))
for _, destHost := range destHosts {
go func(destHost *v1.Host) {
defer wg.Done()
stats, err := ping.Ping(destHost.Ip)
if err != nil {
failedProbes = append(failedProbes, &schedulerv1.FailedProbe{
Host: &v1.Host{
Id: destHost.Id,
Ip: destHost.Ip,
Hostname: destHost.Hostname,
Port: destHost.Port,
DownloadPort: destHost.DownloadPort,
Location: destHost.Location,
Idc: destHost.Idc,
},
Description: err.Error(),
})
return
}
probes = append(probes, &schedulerv1.Probe{
Host: &v1.Host{
Id: destHost.Id,
Ip: destHost.Ip,
Hostname: destHost.Hostname,
Port: destHost.Port,
DownloadPort: destHost.DownloadPort,
Location: destHost.Location,
Idc: destHost.Idc,
},
Rtt: durationpb.New(stats.AvgRtt),
CreatedAt: timestamppb.New(time.Now()),
})
}(destHost)
}
wg.Wait()
return probes, failedProbes
}

View File

@ -1,62 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: probe.go
// Package mocks is a generated GoMock package.
package mocks
import (
reflect "reflect"
gomock "github.com/golang/mock/gomock"
)
// MockProbe is a mock of Probe interface.
type MockProbe struct {
ctrl *gomock.Controller
recorder *MockProbeMockRecorder
}
// MockProbeMockRecorder is the mock recorder for MockProbe.
type MockProbeMockRecorder struct {
mock *MockProbe
}
// NewMockProbe creates a new mock instance.
func NewMockProbe(ctrl *gomock.Controller) *MockProbe {
mock := &MockProbe{ctrl: ctrl}
mock.recorder = &MockProbeMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockProbe) EXPECT() *MockProbeMockRecorder {
return m.recorder
}
// Serve mocks base method.
func (m *MockProbe) Serve() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Serve")
ret0, _ := ret[0].(error)
return ret0
}
// Serve indicates an expected call of Serve.
func (mr *MockProbeMockRecorder) Serve() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockProbe)(nil).Serve))
}
// Stop mocks base method.
func (m *MockProbe) Stop() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Stop")
ret0, _ := ret[0].(error)
return ret0
}
// Stop indicates an expected call of Stop.
func (mr *MockProbeMockRecorder) Stop() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockProbe)(nil).Stop))
}

View File

@ -1,199 +0,0 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/probe_mock.go -source probe.go -package mocks
package probe
import (
"context"
"io"
"sync"
"time"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
v1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/dragonfly/v2/client/config"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/net/ping"
schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client"
)
type Probe interface {
// Serve starts probe server.
Serve()
// Stop stops probe server.
Stop()
}
type probe struct {
config *config.DaemonOption
hostID string
daemonPort int32
daemonDownloadPort int32
schedulerClient schedulerclient.V1
done chan struct{}
}
// NewProbe returns a new Probe interface.
func NewProbe(cfg *config.DaemonOption, hostID string, daemonPort int32, daemonDownloadPort int32,
schedulerClient schedulerclient.V1) (Probe, error) {
return &probe{
config: cfg,
hostID: hostID,
daemonPort: daemonPort,
daemonDownloadPort: daemonDownloadPort,
schedulerClient: schedulerClient,
done: make(chan struct{}),
}, nil
}
// Serve starts probe server.
func (p *probe) Serve() {
logger.Info("collect probes and upload probes to scheduler")
p.uploadProbesToScheduler()
}
// Stop stops probe server.
func (p *probe) Stop() {
close(p.done)
}
// uploadProbesToScheduler collects probes and uploads probes to scheduler.
func (p *probe) uploadProbesToScheduler() {
host := &v1.Host{
Id: p.hostID,
Ip: p.config.Host.AdvertiseIP.String(),
Hostname: p.config.Host.Hostname,
Port: p.daemonPort,
DownloadPort: p.daemonDownloadPort,
Location: p.config.Host.Location,
Idc: p.config.Host.IDC,
}
tick := time.NewTicker(p.config.NetworkTopology.Probe.Interval)
for {
select {
case <-tick.C:
stream, err := p.schedulerClient.SyncProbes(context.Background(), &schedulerv1.SyncProbesRequest{
Host: host,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
},
})
if err != nil {
continue
}
syncProbesResponse, err := stream.Recv()
if err != nil {
if err == io.EOF {
logger.Info("remote SyncProbe done, exit receiving")
return
}
logger.Errorf("receive error: %s", err.Error())
continue
}
logger.Infof("colloect probes from: %#v", syncProbesResponse.Hosts)
probes, failedProbes := p.collectProbes(syncProbesResponse.Hosts)
if len(probes) > 0 {
if err := stream.Send(&schedulerv1.SyncProbesRequest{
Host: host,
Request: &schedulerv1.SyncProbesRequest_ProbeFinishedRequest{
ProbeFinishedRequest: &schedulerv1.ProbeFinishedRequest{
Probes: probes,
},
},
}); err != nil {
logger.Errorf("synchronize finished probe: %w", err)
}
}
if len(failedProbes) > 0 {
if err := stream.Send(&schedulerv1.SyncProbesRequest{
Host: host,
Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
Probes: failedProbes,
},
},
}); err != nil {
logger.Errorf("synchronize failed probe: %w", err)
}
}
case <-p.done:
return
}
}
}
// collectProbes probes hosts, collects probes and failed probes.
func (p *probe) collectProbes(desthosts []*v1.Host) ([]*schedulerv1.Probe, []*schedulerv1.FailedProbe) {
var (
probes []*schedulerv1.Probe
failedProbes []*schedulerv1.FailedProbe
)
wg := &sync.WaitGroup{}
wg.Add(len(desthosts))
for _, desthost := range desthosts {
go func(desthost *v1.Host) {
defer wg.Done()
statistics, err := ping.Ping(desthost.Ip)
if err != nil {
failedProbes = append(failedProbes, &schedulerv1.FailedProbe{
Host: &v1.Host{
Id: desthost.Id,
Ip: desthost.Ip,
Hostname: desthost.Hostname,
Port: desthost.Port,
DownloadPort: desthost.DownloadPort,
Location: desthost.Location,
Idc: desthost.Idc,
},
Description: err.Error(),
})
return
}
probes = append(probes, &schedulerv1.Probe{
Host: &v1.Host{
Id: desthost.Id,
Ip: desthost.Ip,
Hostname: desthost.Hostname,
Port: desthost.Port,
DownloadPort: desthost.DownloadPort,
Location: desthost.Location,
Idc: desthost.Idc,
},
Rtt: durationpb.New(statistics.AvgRtt),
CreatedAt: timestamppb.New(time.Now()),
})
}(desthost)
}
wg.Wait()
return probes, failedProbes
}

View File

@ -1,505 +0,0 @@
/*
* Copyright 2023 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package probe
import (
"context"
"errors"
"io"
"net"
"reflect"
"testing"
"time"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
v1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks"
"d7y.io/dragonfly/v2/client/config"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks"
)
var (
mockDaemonConfig = &config.DaemonOption{
Host: config.HostOption{
Location: mockHostLocation,
IDC: mockHostIDC,
Hostname: idgen.HostIDV2("127.0.0.1", "bar"),
AdvertiseIP: net.IPv4(127, 0, 0, 1),
},
NetworkTopology: config.NetworkTopologyOption{
Enable: true,
Probe: config.ProbeOption{
Interval: 2 * time.Second,
},
},
}
mockPort = 8000
mockDownloadPort = 8001
mockHostLocation = "location"
mockHostIDC = "idc"
mockHost = &v1.Host{
Id: "foo",
Ip: "127.0.0.1",
Hostname: idgen.HostIDV2("127.0.0.1", "foo"),
Port: int32(mockPort),
DownloadPort: int32(mockDownloadPort),
Location: mockHostLocation,
Idc: mockHostIDC,
}
mockSeedHost = &v1.Host{
Id: "bar",
Ip: "127.0.0.1",
Hostname: idgen.HostIDV2("127.0.0.1", "bar"),
Port: int32(mockPort),
DownloadPort: int32(mockDownloadPort),
Location: mockHostLocation,
Idc: mockHostIDC,
}
)
func Test_NewProbe(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, probe Probe, err error)
}{
{
name: "new probe",
expect: func(t *testing.T, probe Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(reflect.TypeOf(probe).Elem().Name(), "probe")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
schedulerClient := schedulerclientmocks.NewMockV1(ctl)
probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient)
tc.expect(t, probe, err)
})
}
}
func TestProbe_Serve(t *testing.T) {
tests := []struct {
name string
interval time.Duration
sleep func()
mock func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder)
expect func(t *testing.T, p Probe, err error)
}{
{
name: "collect probes and upload probes to scheduler",
interval: 200 * time.Millisecond,
sleep: func() {
time.Sleep(300 * time.Millisecond)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(&schedulerv1.SyncProbesResponse{
Hosts: []*v1.Host{mockHost},
}, nil).Times(1),
ms.Send(gomock.Any()).Return(nil).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.Serve()
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
schedulerClient := schedulerclientmocks.NewMockV1(ctl)
stream := schedulerv1mocks.NewMockScheduler_SyncProbesClient(ctl)
tc.mock(schedulerClient.EXPECT(), stream, stream.EXPECT())
mockDaemonConfig.NetworkTopology.Probe.Interval = tc.interval
probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient)
tc.expect(t, probe, err)
tc.sleep()
probe.Stop()
})
}
}
func TestProbe_uploadProbesToScheduler(t *testing.T) {
tests := []struct {
name string
interval time.Duration
sleep func()
mock func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder)
expect func(t *testing.T, p Probe, err error)
}{
{
name: "collect probes and send ProbeFinishedRequest",
interval: 200 * time.Millisecond,
sleep: func() {
time.Sleep(300 * time.Millisecond)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(&schedulerv1.SyncProbesResponse{
Hosts: []*v1.Host{mockHost},
}, nil).Times(1),
ms.Send(gomock.Any()).Return(nil).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
{
name: "collect fail probes and send ProbeFailedRequest",
interval: 3 * time.Second,
sleep: func() {
time.Sleep(5 * time.Second)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(&schedulerv1.SyncProbesResponse{
Hosts: []*v1.Host{
{
Id: idgen.HostIDV2("172.0.0.1", "foo"),
Ip: "172.0.0.1",
Hostname: "foo",
Port: 8003,
DownloadPort: 8001,
Location: "location",
Idc: "idc",
},
},
}, nil).Times(1),
ms.Send(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
Probes: []*schedulerv1.FailedProbe{
{
Host: &v1.Host{
Id: idgen.HostIDV2("172.0.0.1", "foo"),
Ip: "172.0.0.1",
Hostname: "foo",
Port: 8003,
DownloadPort: 8001,
Location: "location",
Idc: "idc",
},
Description: "receive packet failed",
},
},
},
},
}).Return(nil).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
{
name: "syncProbe error",
interval: 200 * time.Millisecond,
sleep: func() {
time.Sleep(300 * time.Millisecond)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(nil, errors.New("syncProbe error")).Times(1)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
{
name: "receive error",
interval: 200 * time.Millisecond,
sleep: func() {
time.Sleep(300 * time.Millisecond)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(nil, errors.New("receive error")).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
{
name: "receive EOF",
interval: 200 * time.Millisecond,
sleep: func() {
time.Sleep(300 * time.Millisecond)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(nil, io.EOF).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
{
name: "send ProbeFinishedRequest error",
interval: 200 * time.Millisecond,
sleep: func() {
time.Sleep(300 * time.Millisecond)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(&schedulerv1.SyncProbesResponse{
Hosts: []*v1.Host{mockHost},
}, nil).Times(1),
ms.Send(gomock.Any()).Return(errors.New("send ProbeFinishedRequest error")).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
{
name: "send ProbeFailedRequest error",
interval: 3 * time.Second,
sleep: func() {
time.Sleep(5 * time.Second)
},
mock: func(mv *mocks.MockV1MockRecorder, stream *schedulerv1mocks.MockScheduler_SyncProbesClient,
ms *schedulerv1mocks.MockScheduler_SyncProbesClientMockRecorder) {
gomock.InOrder(
mv.SyncProbes(gomock.Eq(context.Background()), gomock.Eq(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeStartedRequest{
ProbeStartedRequest: &schedulerv1.ProbeStartedRequest{},
}})).Return(stream, nil).Times(1),
ms.Recv().Return(&schedulerv1.SyncProbesResponse{
Hosts: []*v1.Host{
{
Id: idgen.HostIDV2("172.0.0.1", "foo"),
Ip: "172.0.0.1",
Hostname: "foo",
Port: 8003,
DownloadPort: 8001,
Location: "location",
Idc: "idc",
},
},
}, nil).Times(1),
ms.Send(&schedulerv1.SyncProbesRequest{
Host: mockSeedHost,
Request: &schedulerv1.SyncProbesRequest_ProbeFailedRequest{
ProbeFailedRequest: &schedulerv1.ProbeFailedRequest{
Probes: []*schedulerv1.FailedProbe{
{
Host: &v1.Host{
Id: idgen.HostIDV2("172.0.0.1", "foo"),
Ip: "172.0.0.1",
Hostname: "foo",
Port: 8003,
DownloadPort: 8001,
Location: "location",
Idc: "idc",
},
Description: "receive packet failed",
},
},
},
},
}).Return(errors.New("send ProbeFailedRequest error")).Times(1),
)
},
expect: func(t *testing.T, p Probe, err error) {
assert := assert.New(t)
assert.NoError(err)
go p.(*probe).uploadProbesToScheduler()
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
schedulerClient := schedulerclientmocks.NewMockV1(ctl)
stream := schedulerv1mocks.NewMockScheduler_SyncProbesClient(ctl)
tc.mock(schedulerClient.EXPECT(), stream, stream.EXPECT())
mockDaemonConfig.NetworkTopology.Probe.Interval = tc.interval
probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient)
tc.expect(t, probe, err)
tc.sleep()
probe.Stop()
})
}
}
func TestProbe_collectProbes(t *testing.T) {
tests := []struct {
name string
destHosts []*v1.Host
expect func(t *testing.T, p Probe, err error, destHosts []*v1.Host)
}{
{
name: "collect probes",
destHosts: []*v1.Host{mockHost},
expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) {
assert := assert.New(t)
assert.NoError(err)
probes, failProbes := p.(*probe).collectProbes(destHosts)
assert.Equal(len(probes), 1)
assert.Equal(len(failProbes), 0)
},
},
{
name: "collect fail probes",
destHosts: []*v1.Host{
{
Id: idgen.HostIDV2("172.0.0.1", "foo"),
Ip: "172.0.0.1",
Hostname: "foo",
Port: 8003,
DownloadPort: 8001,
Location: "location",
Idc: "idc",
},
},
expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) {
assert := assert.New(t)
assert.NoError(err)
probes, failProbes := p.(*probe).collectProbes(destHosts)
assert.Equal(len(probes), 0)
assert.Equal(len(failProbes), 1)
},
},
{
name: "collect probes and fail probes",
destHosts: []*v1.Host{
mockHost,
{
Id: idgen.HostIDV2("172.0.0.1", "foo"),
Ip: "172.0.0.1",
Hostname: "foo",
Port: 8003,
DownloadPort: 8001,
Location: "location",
Idc: "idc",
},
},
expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) {
assert := assert.New(t)
assert.NoError(err)
probes, failProbes := p.(*probe).collectProbes(destHosts)
assert.Equal(len(probes), 1)
assert.Equal(len(failProbes), 1)
},
},
{
name: "dest hosts is empty",
destHosts: []*v1.Host{},
expect: func(t *testing.T, p Probe, err error, destHosts []*v1.Host) {
assert := assert.New(t)
assert.NoError(err)
probes, failProbes := p.(*probe).collectProbes(destHosts)
assert.Equal(len(probes), 0)
assert.Equal(len(failProbes), 0)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
schedulerClient := schedulerclientmocks.NewMockV1(ctl)
probe, err := NewProbe(mockDaemonConfig, mockSeedHost.Id, int32(mockPort), int32(mockDownloadPort), schedulerClient)
tc.expect(t, probe, err, tc.destHosts)
})
}
}