diff --git a/pkg/redis/redis.go b/pkg/redis/redis.go index 4de10814c..9ea957a9e 100644 --- a/pkg/redis/redis.go +++ b/pkg/redis/redis.go @@ -19,12 +19,18 @@ package redis import ( "context" "fmt" + "strings" "github.com/go-redis/redis/v8" "d7y.io/dragonfly/v2/pkg/types" ) +const ( + // KeySeparator is the separator of redis key. + KeySeparator = ":" +) + const ( // SeedPeerNamespace prefix of seed peers namespace cache key. SeedPeersNamespace = "seed-peers" @@ -132,6 +138,16 @@ func MakeNetworkTopologyKeyInScheduler(srcHostID, destHostID string) string { return MakeKeyInScheduler(NetworkTopologyNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID)) } +// ParseNetworkTopologyKeyInScheduler parse network topology key in scheduler. +func ParseNetworkTopologyKeyInScheduler(key string) (string, string, string, string, error) { + elements := strings.Split(key, KeySeparator) + if len(elements) != 4 { + return "", "", "", "", fmt.Errorf("invalid network topology key: %s", key) + } + + return elements[0], elements[1], elements[2], elements[3], nil +} + // MakeProbesKeyInScheduler make probes key in scheduler. func MakeProbesKeyInScheduler(srcHostID, destHostID string) string { return MakeKeyInScheduler(ProbesNamespace, fmt.Sprintf("%s:%s", srcHostID, destHostID)) diff --git a/scheduler/networktopology/mocks/network_topology_mock.go b/scheduler/networktopology/mocks/network_topology_mock.go index d0b45ae89..c9013c7cd 100644 --- a/scheduler/networktopology/mocks/network_topology_mock.go +++ b/scheduler/networktopology/mocks/network_topology_mock.go @@ -107,6 +107,48 @@ func (mr *MockNetworkTopologyMockRecorder) Probes(arg0, arg1 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Probes", reflect.TypeOf((*MockNetworkTopology)(nil).Probes), arg0, arg1) } +// Serve mocks base method. +func (m *MockNetworkTopology) Serve() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Serve") + ret0, _ := ret[0].(error) + return ret0 +} + +// Serve indicates an expected call of Serve. +func (mr *MockNetworkTopologyMockRecorder) Serve() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Serve", reflect.TypeOf((*MockNetworkTopology)(nil).Serve)) +} + +// Snapshot mocks base method. +func (m *MockNetworkTopology) Snapshot() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Snapshot") + ret0, _ := ret[0].(error) + return ret0 +} + +// Snapshot indicates an expected call of Snapshot. +func (mr *MockNetworkTopologyMockRecorder) Snapshot() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Snapshot", reflect.TypeOf((*MockNetworkTopology)(nil).Snapshot)) +} + +// Stop mocks base method. +func (m *MockNetworkTopology) Stop() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Stop") + ret0, _ := ret[0].(error) + return ret0 +} + +// Stop indicates an expected call of Stop. +func (mr *MockNetworkTopologyMockRecorder) Stop() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockNetworkTopology)(nil).Stop)) +} + // Store mocks base method. func (m *MockNetworkTopology) Store(arg0, arg1 string) error { m.ctrl.T.Helper() diff --git a/scheduler/networktopology/network_topology.go b/scheduler/networktopology/network_topology.go index ff02894c9..967818c79 100644 --- a/scheduler/networktopology/network_topology.go +++ b/scheduler/networktopology/network_topology.go @@ -23,6 +23,7 @@ import ( "time" "github.com/go-redis/redis/v8" + "github.com/google/uuid" logger "d7y.io/dragonfly/v2/internal/dflog" pkgredis "d7y.io/dragonfly/v2/pkg/redis" @@ -34,10 +35,19 @@ import ( const ( // contextTimeout is the timeout of redis invoke. contextTimeout = 2 * time.Minute + + // snapshotContextTimeout is the timeout of snapshot network topology. + snapshotContextTimeout = 20 * time.Minute ) // NetworkTopology is an interface for network topology. type NetworkTopology interface { + // Started network topology server. + Serve() error + + // Stop network topology server. + Stop() error + // Has to check if there is a connection between source host and destination host. Has(string, string) bool @@ -55,6 +65,9 @@ type NetworkTopology interface { // ProbedAt is the time when the host was last probed. ProbedAt(string) (time.Time, error) + + // Snapshot writes the current network topology to the storage. + Snapshot() error } // networkTopology is an implementation of network topology. @@ -70,6 +83,9 @@ type networkTopology struct { // storage is storage interface. storage storage.Storage + + // done channel will be closed when network topology serve stop. + done chan struct{} } // New network topology interface. @@ -79,9 +95,33 @@ func NewNetworkTopology(cfg config.NetworkTopologyConfig, rdb redis.UniversalCli rdb: rdb, resource: resource, storage: storage, + done: make(chan struct{}), }, nil } +// Started network topology server. +func (nt *networkTopology) Serve() error { + logger.Info("collect network topology records") + tick := time.NewTicker(nt.config.CollectInterval) + for { + select { + case <-tick.C: + if err := nt.Snapshot(); err != nil { + logger.Error(err) + break + } + case <-nt.done: + return nil + } + } +} + +// Stop network topology server. +func (nt *networkTopology) Stop() error { + close(nt.done) + return nil +} + // Has to check if there is a connection between source host and destination host. func (nt *networkTopology) Has(srcHostID string, destHostID string) bool { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) @@ -154,6 +194,11 @@ func (nt *networkTopology) DeleteHost(hostID string) error { return nil } +// Probes loads probes interface by source host id and destination host id. +func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes { + return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID) +} + // ProbedCount is the number of times the host has been probed. func (nt *networkTopology) ProbedCount(hostID string) (uint64, error) { ctx, cancel := context.WithTimeout(context.Background(), contextTimeout) @@ -170,7 +215,219 @@ func (nt *networkTopology) ProbedAt(hostID string) (time.Time, error) { return nt.rdb.Get(ctx, pkgredis.MakeProbedAtKeyInScheduler(hostID)).Time() } -// Probes is the interface to store probes. -func (nt *networkTopology) Probes(srcHostID, destHostID string) Probes { - return NewProbes(nt.config.Probe, nt.rdb, srcHostID, destHostID) +// Snapshot writes the current network topology to the storage. +func (nt *networkTopology) Snapshot() error { + ctx, cancel := context.WithTimeout(context.Background(), snapshotContextTimeout) + defer cancel() + + now := time.Now() + probedCountKeys, err := nt.rdb.Keys(ctx, pkgredis.MakeProbedCountKeyInScheduler("*")).Result() + if err != nil { + return err + } + + for _, srcHostID := range probedCountKeys { + // Construct destination hosts for network topology. + networkTopologyKeys, err := nt.rdb.Keys(ctx, pkgredis.MakeNetworkTopologyKeyInScheduler(srcHostID, "*")).Result() + if err != nil { + logger.Error(err) + continue + } + + destHosts := make([]storage.DestHost, 0, len(networkTopologyKeys)) + for _, networkTopologyKey := range networkTopologyKeys { + _, _, srcHostID, destHostID, err := pkgredis.ParseNetworkTopologyKeyInScheduler(networkTopologyKey) + if err != nil { + logger.Error(err) + continue + } + + host, loaded := nt.resource.HostManager().Load(destHostID) + if !loaded { + logger.Errorf("host %s not found", destHostID) + continue + } + + ps := nt.Probes(srcHostID, destHostID) + averageRTT, err := ps.AverageRTT() + if err != nil { + logger.Error(err) + continue + } + + createdAt, err := ps.CreatedAt() + if err != nil { + logger.Error(err) + continue + } + + updatedAt, err := ps.UpdatedAt() + if err != nil { + logger.Error(err) + continue + } + + destHost := storage.DestHost{ + Host: storage.Host{ + ID: host.ID, + Type: host.Type.Name(), + Hostname: host.Hostname, + IP: host.IP, + Port: host.Port, + DownloadPort: host.DownloadPort, + OS: host.OS, + Platform: host.Platform, + PlatformFamily: host.PlatformFamily, + PlatformVersion: host.PlatformVersion, + KernelVersion: host.KernelVersion, + ConcurrentUploadLimit: host.ConcurrentUploadLimit.Load(), + ConcurrentUploadCount: host.ConcurrentUploadCount.Load(), + UploadCount: host.UploadCount.Load(), + UploadFailedCount: host.UploadFailedCount.Load(), + CPU: resource.CPU{ + LogicalCount: host.CPU.LogicalCount, + PhysicalCount: host.CPU.PhysicalCount, + Percent: host.CPU.Percent, + ProcessPercent: host.CPU.ProcessPercent, + Times: resource.CPUTimes{ + User: host.CPU.Times.User, + System: host.CPU.Times.System, + Idle: host.CPU.Times.Idle, + Nice: host.CPU.Times.Nice, + Iowait: host.CPU.Times.Iowait, + Irq: host.CPU.Times.Irq, + Softirq: host.CPU.Times.Softirq, + Steal: host.CPU.Times.Steal, + Guest: host.CPU.Times.Guest, + GuestNice: host.CPU.Times.GuestNice, + }, + }, + Memory: resource.Memory{ + Total: host.Memory.Total, + Available: host.Memory.Available, + Used: host.Memory.Used, + UsedPercent: host.Memory.UsedPercent, + ProcessUsedPercent: host.Memory.ProcessUsedPercent, + Free: host.Memory.Free, + }, + Network: resource.Network{ + TCPConnectionCount: host.Network.TCPConnectionCount, + UploadTCPConnectionCount: host.Network.UploadTCPConnectionCount, + Location: host.Network.Location, + IDC: host.Network.IDC, + }, + Disk: resource.Disk{ + Total: host.Disk.Total, + Free: host.Disk.Free, + Used: host.Disk.Used, + UsedPercent: host.Disk.UsedPercent, + InodesTotal: host.Disk.InodesTotal, + InodesUsed: host.Disk.InodesUsed, + InodesFree: host.Disk.InodesFree, + InodesUsedPercent: host.Disk.InodesUsedPercent, + }, + Build: resource.Build{ + GitVersion: host.Build.GitVersion, + GitCommit: host.Build.GitCommit, + GoVersion: host.Build.GoVersion, + Platform: host.Build.Platform, + }, + CreatedAt: host.CreatedAt.Load().UnixNano(), + UpdatedAt: host.UpdatedAt.Load().UnixNano(), + }, + Probes: storage.Probes{ + AverageRTT: averageRTT.Nanoseconds(), + CreatedAt: createdAt.UnixNano(), + UpdatedAt: updatedAt.UnixNano(), + }, + } + + destHosts = append(destHosts, destHost) + } + + // Construct source hosts for network topology. + host, loaded := nt.resource.HostManager().Load(srcHostID) + if !loaded { + logger.Errorf("host %s not found", srcHostID) + continue + } + + if err = nt.storage.CreateNetworkTopology(storage.NetworkTopology{ + ID: uuid.NewString(), + Host: storage.Host{ + ID: host.ID, + Type: host.Type.Name(), + Hostname: host.Hostname, + IP: host.IP, + Port: host.Port, + DownloadPort: host.DownloadPort, + OS: host.OS, + Platform: host.Platform, + PlatformFamily: host.PlatformFamily, + PlatformVersion: host.PlatformVersion, + KernelVersion: host.KernelVersion, + ConcurrentUploadLimit: host.ConcurrentUploadLimit.Load(), + ConcurrentUploadCount: host.ConcurrentUploadCount.Load(), + UploadCount: host.UploadCount.Load(), + UploadFailedCount: host.UploadFailedCount.Load(), + CPU: resource.CPU{ + LogicalCount: host.CPU.LogicalCount, + PhysicalCount: host.CPU.PhysicalCount, + Percent: host.CPU.Percent, + ProcessPercent: host.CPU.ProcessPercent, + Times: resource.CPUTimes{ + User: host.CPU.Times.User, + System: host.CPU.Times.System, + Idle: host.CPU.Times.Idle, + Nice: host.CPU.Times.Nice, + Iowait: host.CPU.Times.Iowait, + Irq: host.CPU.Times.Irq, + Softirq: host.CPU.Times.Softirq, + Steal: host.CPU.Times.Steal, + Guest: host.CPU.Times.Guest, + GuestNice: host.CPU.Times.GuestNice, + }, + }, + Memory: resource.Memory{ + Total: host.Memory.Total, + Available: host.Memory.Available, + Used: host.Memory.Used, + UsedPercent: host.Memory.UsedPercent, + ProcessUsedPercent: host.Memory.ProcessUsedPercent, + Free: host.Memory.Free, + }, + Network: resource.Network{ + TCPConnectionCount: host.Network.TCPConnectionCount, + UploadTCPConnectionCount: host.Network.UploadTCPConnectionCount, + Location: host.Network.Location, + IDC: host.Network.IDC, + }, + Disk: resource.Disk{ + Total: host.Disk.Total, + Free: host.Disk.Free, + Used: host.Disk.Used, + UsedPercent: host.Disk.UsedPercent, + InodesTotal: host.Disk.InodesTotal, + InodesUsed: host.Disk.InodesUsed, + InodesFree: host.Disk.InodesFree, + InodesUsedPercent: host.Disk.InodesUsedPercent, + }, + Build: resource.Build{ + GitVersion: host.Build.GitVersion, + GitCommit: host.Build.GitCommit, + GoVersion: host.Build.GoVersion, + Platform: host.Build.Platform, + }, + CreatedAt: host.CreatedAt.Load().UnixNano(), + UpdatedAt: host.UpdatedAt.Load().UnixNano(), + }, + DestHosts: destHosts, + CreatedAt: now.UnixNano(), + }); err != nil { + logger.Error(err) + continue + } + } + + return nil } diff --git a/scheduler/storage/storage_test.go b/scheduler/storage/storage_test.go index 1c0e888fe..6e7f494fa 100644 --- a/scheduler/storage/storage_test.go +++ b/scheduler/storage/storage_test.go @@ -219,6 +219,7 @@ var ( ID: "6", Host: mockSeedHost, DestHosts: mockDestHosts, + CreatedAt: time.Now().UnixNano(), } mockDestHost = DestHost{ diff --git a/scheduler/storage/types.go b/scheduler/storage/types.go index f69bc48b6..baf44962c 100644 --- a/scheduler/storage/types.go +++ b/scheduler/storage/types.go @@ -231,4 +231,7 @@ type NetworkTopology struct { // DestHosts is the destination hosts probed from source host. DestHosts []DestHost `csv:"destHosts" csv[]:"10"` + + // CreatedAt is network topology create nanosecond time. + CreatedAt int64 `csv:"createdAt"` }