feat: support to collect and snapshot in network topology (#2429)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-06-05 15:26:35 +08:00
parent 7fbe7553f9
commit 274c6d04e6
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
5 changed files with 322 additions and 3 deletions

View File

@ -19,12 +19,18 @@ package redis
import (
"context"
"fmt"
"strings"
"github.com/go-redis/redis/v8"
"d7y.io/dragonfly/v2/pkg/types"
)
const (
// KeySeparator is the separator of redis key.
KeySeparator = ":"
)
const (
// SeedPeerNamespace prefix of seed peers namespace cache key.
SeedPeersNamespace = "seed-peers"
@ -132,6 +138,16 @@ func MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID string) string {
return MakeKeyInScheduler(NetworkTopologyNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID))
}
// ParseNetworkTopologyKeyInScheduler parse network topology key in scheduler.
func ParseNetworkTopologyKeyInScheduler(key string) (string, string, string, string, error) {
elements := strings.Split(key, KeySeparator)
if len(elements) != 4 {
return "", "", "", "", fmt.Errorf("invalid network topology key: %s", key)
}
return elements[0], elements[1], elements[2], elements[3], nil
}
// MakeProbesKeyInScheduler make probes key in scheduler.
func MakeProbesKeyInScheduler(srcHostID, destHostID string) string {
return MakeKeyInScheduler(ProbesNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID))

View File

@ -107,6 +107,48 @@ func (mr *MockNetworkTopologyMockRecorder) Probes(arg0, arg1 interface{}) *gomoc
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Probes", reflect.TypeOf((*MockNetworkTopology)(nil).Probes), arg0, arg1)
}
// Serve mocks base method.
func (m *MockNetworkTopology) Serve() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Serve")
ret0, _ := ret[0].(error)
return ret0
}
// Serve indicates an expected call of Serve.
func (mr *MockNetworkTopologyMockRecorder) Serve() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockNetworkTopology)(nil).Serve))
}
// Snapshot mocks base method.
func (m *MockNetworkTopology) Snapshot() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Snapshot")
ret0, _ := ret[0].(error)
return ret0
}
// Snapshot indicates an expected call of Snapshot.
func (mr *MockNetworkTopologyMockRecorder) Snapshot() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockNetworkTopology)(nil).Snapshot))
}
// Stop mocks base method.
func (m *MockNetworkTopology) Stop() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Stop")
ret0, _ := ret[0].(error)
return ret0
}
// Stop indicates an expected call of Stop.
func (mr *MockNetworkTopologyMockRecorder) Stop() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockNetworkTopology)(nil).Stop))
}
// Store mocks base method.
func (m *MockNetworkTopology) Store(arg0, arg1 string) error {
m.ctrl.T.Helper()

View File

@ -23,6 +23,7 @@ import (
"time"
"github.com/go-redis/redis/v8"
"github.com/google/uuid"
logger "d7y.io/dragonfly/v2/internal/dflog"
pkgredis "d7y.io/dragonfly/v2/pkg/redis"
@ -34,10 +35,19 @@ import (
const (
// contextTimeout is the timeout of redis invoke.
contextTimeout = 2 * time.Minute
// snapshotContextTimeout is the timeout of snapshot network topology.
snapshotContextTimeout = 20 * time.Minute
)
// NetworkTopology is an interface for network topology.
type NetworkTopology interface {
// Started network topology server.
Serve() error
// Stop network topology server.
Stop() error
// Has to check if there is a connection between source host and destination host.
Has(string, string) bool
@ -55,6 +65,9 @@ type NetworkTopology interface {
// ProbedAt is the time when the host was last probed.
ProbedAt(string) (time.Time, error)
// Snapshot writes the current network topology to the storage.
Snapshot() error
}
// networkTopology is an implementation of network topology.
@ -70,6 +83,9 @@ type networkTopology struct {
// storage is storage interface.
storage storage.Storage
// done channel will be closed when network topology serve stop.
done chan struct{}
}
// New network topology interface.
@ -79,9 +95,33 @@ func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalCli
rdb: rdb,
resource: resource,
storage: storage,
done: make(chan struct{}),
}, nil
}
// Started network topology server.
func (nt *networkTopology) Serve() error {
logger.Info("collect network topology records")
tick := time.NewTicker(nt.config.CollectInterval)
for {
select {
case <-tick.C:
if err := nt.Snapshot(); err != nil {
logger.Error(err)
break
}
case <-nt.done:
return nil
}
}
}
// Stop network topology server.
func (nt *networkTopology) Stop() error {
close(nt.done)
return nil
}
// Has to check if there is a connection between source host and destination host.
func (nt *networkTopology) Has(srcHostID string, destHostID string) bool {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
@ -154,6 +194,11 @@ func (nt *networkTopology) DeleteHost(hostID string) error {
return nil
}
// Probes loads probes interface by source host id and destination host id.
func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes {
return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID)
}
// ProbedCount is the number of times the host has been probed.
func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) {
ctx, cancel := context.WithTimeout(context.Background(), contextTimeout)
@ -170,7 +215,219 @@ func (nt *networkTopology) ProbedAt(hostID string) (time.Time, error) {
return nt.rdb.Get(ctx, pkgredis.MakeProbedAtKeyInScheduler(hostID)).Time()
}
// Probes is the interface to store probes.
func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes {
return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID)
// Snapshot writes the current network topology to the storage.
func (nt *networkTopology) Snapshot() error {
ctx, cancel := context.WithTimeout(context.Background(), snapshotContextTimeout)
defer cancel()
now := time.Now()
probedCountKeys, err := nt.rdb.Keys(ctx, pkgredis.MakeProbedCountKeyInScheduler("*")).Result()
if err != nil {
return err
}
for _, srcHostID := range probedCountKeys {
// Construct destination hosts for network topology.
networkTopologyKeys, err := nt.rdb.Keys(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, "*")).Result()
if err != nil {
logger.Error(err)
continue
}
destHosts := make([]storage.DestHost, 0, len(networkTopologyKeys))
for _, networkTopologyKey := range networkTopologyKeys {
_, _, srcHostID, destHostID, err := pkgredis.ParseNetworkTopologyKeyInScheduler(networkTopologyKey)
if err != nil {
logger.Error(err)
continue
}
host, loaded := nt.resource.HostManager().Load(destHostID)
if !loaded {
logger.Errorf("host %s not found", destHostID)
continue
}
ps := nt.Probes(srcHostID, destHostID)
averageRTT, err := ps.AverageRTT()
if err != nil {
logger.Error(err)
continue
}
createdAt, err := ps.CreatedAt()
if err != nil {
logger.Error(err)
continue
}
updatedAt, err := ps.UpdatedAt()
if err != nil {
logger.Error(err)
continue
}
destHost := storage.DestHost{
Host: storage.Host{
ID: host.ID,
Type: host.Type.Name(),
Hostname: host.Hostname,
IP: host.IP,
Port: host.Port,
DownloadPort: host.DownloadPort,
OS: host.OS,
Platform: host.Platform,
PlatformFamily: host.PlatformFamily,
PlatformVersion: host.PlatformVersion,
KernelVersion: host.KernelVersion,
ConcurrentUploadLimit: host.ConcurrentUploadLimit.Load(),
ConcurrentUploadCount: host.ConcurrentUploadCount.Load(),
UploadCount: host.UploadCount.Load(),
UploadFailedCount: host.UploadFailedCount.Load(),
CPU: resource.CPU{
LogicalCount: host.CPU.LogicalCount,
PhysicalCount: host.CPU.PhysicalCount,
Percent: host.CPU.Percent,
ProcessPercent: host.CPU.ProcessPercent,
Times: resource.CPUTimes{
User: host.CPU.Times.User,
System: host.CPU.Times.System,
Idle: host.CPU.Times.Idle,
Nice: host.CPU.Times.Nice,
Iowait: host.CPU.Times.Iowait,
Irq: host.CPU.Times.Irq,
Softirq: host.CPU.Times.Softirq,
Steal: host.CPU.Times.Steal,
Guest: host.CPU.Times.Guest,
GuestNice: host.CPU.Times.GuestNice,
},
},
Memory: resource.Memory{
Total: host.Memory.Total,
Available: host.Memory.Available,
Used: host.Memory.Used,
UsedPercent: host.Memory.UsedPercent,
ProcessUsedPercent: host.Memory.ProcessUsedPercent,
Free: host.Memory.Free,
},
Network: resource.Network{
TCPConnectionCount: host.Network.TCPConnectionCount,
UploadTCPConnectionCount: host.Network.UploadTCPConnectionCount,
Location: host.Network.Location,
IDC: host.Network.IDC,
},
Disk: resource.Disk{
Total: host.Disk.Total,
Free: host.Disk.Free,
Used: host.Disk.Used,
UsedPercent: host.Disk.UsedPercent,
InodesTotal: host.Disk.InodesTotal,
InodesUsed: host.Disk.InodesUsed,
InodesFree: host.Disk.InodesFree,
InodesUsedPercent: host.Disk.InodesUsedPercent,
},
Build: resource.Build{
GitVersion: host.Build.GitVersion,
GitCommit: host.Build.GitCommit,
GoVersion: host.Build.GoVersion,
Platform: host.Build.Platform,
},
CreatedAt: host.CreatedAt.Load().UnixNano(),
UpdatedAt: host.UpdatedAt.Load().UnixNano(),
},
Probes: storage.Probes{
AverageRTT: averageRTT.Nanoseconds(),
CreatedAt: createdAt.UnixNano(),
UpdatedAt: updatedAt.UnixNano(),
},
}
destHosts = append(destHosts, destHost)
}
// Construct source hosts for network topology.
host, loaded := nt.resource.HostManager().Load(srcHostID)
if !loaded {
logger.Errorf("host %s not found", srcHostID)
continue
}
if err = nt.storage.CreateNetworkTopology(storage.NetworkTopology{
ID: uuid.NewString(),
Host: storage.Host{
ID: host.ID,
Type: host.Type.Name(),
Hostname: host.Hostname,
IP: host.IP,
Port: host.Port,
DownloadPort: host.DownloadPort,
OS: host.OS,
Platform: host.Platform,
PlatformFamily: host.PlatformFamily,
PlatformVersion: host.PlatformVersion,
KernelVersion: host.KernelVersion,
ConcurrentUploadLimit: host.ConcurrentUploadLimit.Load(),
ConcurrentUploadCount: host.ConcurrentUploadCount.Load(),
UploadCount: host.UploadCount.Load(),
UploadFailedCount: host.UploadFailedCount.Load(),
CPU: resource.CPU{
LogicalCount: host.CPU.LogicalCount,
PhysicalCount: host.CPU.PhysicalCount,
Percent: host.CPU.Percent,
ProcessPercent: host.CPU.ProcessPercent,
Times: resource.CPUTimes{
User: host.CPU.Times.User,
System: host.CPU.Times.System,
Idle: host.CPU.Times.Idle,
Nice: host.CPU.Times.Nice,
Iowait: host.CPU.Times.Iowait,
Irq: host.CPU.Times.Irq,
Softirq: host.CPU.Times.Softirq,
Steal: host.CPU.Times.Steal,
Guest: host.CPU.Times.Guest,
GuestNice: host.CPU.Times.GuestNice,
},
},
Memory: resource.Memory{
Total: host.Memory.Total,
Available: host.Memory.Available,
Used: host.Memory.Used,
UsedPercent: host.Memory.UsedPercent,
ProcessUsedPercent: host.Memory.ProcessUsedPercent,
Free: host.Memory.Free,
},
Network: resource.Network{
TCPConnectionCount: host.Network.TCPConnectionCount,
UploadTCPConnectionCount: host.Network.UploadTCPConnectionCount,
Location: host.Network.Location,
IDC: host.Network.IDC,
},
Disk: resource.Disk{
Total: host.Disk.Total,
Free: host.Disk.Free,
Used: host.Disk.Used,
UsedPercent: host.Disk.UsedPercent,
InodesTotal: host.Disk.InodesTotal,
InodesUsed: host.Disk.InodesUsed,
InodesFree: host.Disk.InodesFree,
InodesUsedPercent: host.Disk.InodesUsedPercent,
},
Build: resource.Build{
GitVersion: host.Build.GitVersion,
GitCommit: host.Build.GitCommit,
GoVersion: host.Build.GoVersion,
Platform: host.Build.Platform,
},
CreatedAt: host.CreatedAt.Load().UnixNano(),
UpdatedAt: host.UpdatedAt.Load().UnixNano(),
},
DestHosts: destHosts,
CreatedAt: now.UnixNano(),
}); err != nil {
logger.Error(err)
continue
}
}
return nil
}

View File

@ -219,6 +219,7 @@ var (
ID: "6",
Host: mockSeedHost,
DestHosts: mockDestHosts,
CreatedAt: time.Now().UnixNano(),
}
mockDestHost = DestHost{

View File

@ -231,4 +231,7 @@ type NetworkTopology struct {
// DestHosts is the destination hosts probed from source host.
DestHosts []DestHost `csv:"destHosts" csv[]:"10"`
// CreatedAt is network topology create nanosecond time.
CreatedAt int64 `csv:"createdAt"`
}