diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index e7f084013..b11e4ab10 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -23,8 +23,6 @@ import ( "go.uber.org/atomic" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" - logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/scheduler/config" @@ -41,6 +39,87 @@ func WithConcurrentUploadLimit(limit int32) HostOption { } } +// WithOS sets host's os. +func WithOS(os string) HostOption { + return func(h *Host) *Host { + h.OS = os + return h + } +} + +// WithPlatform sets host's platform. +func WithPlatform(platform string) HostOption { + return func(h *Host) *Host { + h.Platform = platform + return h + } +} + +// WithPlatformFamily sets host's platform family. +func WithPlatformFamily(platformFamily string) HostOption { + return func(h *Host) *Host { + h.PlatformFamily = platformFamily + return h + } +} + +// WithPlatformVersion sets host's platform version. +func WithPlatformVersion(platformVersion string) HostOption { + return func(h *Host) *Host { + h.PlatformVersion = platformVersion + return h + } +} + +// WithKernelVersion sets host's kernel version. +func WithKernelVersion(kernelVersion string) HostOption { + return func(h *Host) *Host { + h.KernelVersion = kernelVersion + return h + } +} + +// WithCPU sets host's cpu. +func WithCPU(cpu CPU) HostOption { + return func(h *Host) *Host { + h.CPU = cpu + return h + } +} + +// WithMemory sets host's memory. +func WithMemory(memory Memory) HostOption { + return func(h *Host) *Host { + h.Memory = memory + return h + } +} + +// WithNetwork sets host's network. +func WithNetwork(network Network) HostOption { + return func(h *Host) *Host { + h.Network = network + return h + } +} + +// WithDisk sets host's disk. +func WithDisk(disk Disk) HostOption { + return func(h *Host) *Host { + h.Disk = disk + return h + } +} + +// WithBuild sets host's build information. +func WithBuild(build Build) HostOption { + return func(h *Host) *Host { + h.Build = build + return h + } +} + +// Host contains content for host. type Host struct { // ID is host id. ID string @@ -76,19 +155,19 @@ type Host struct { KernelVersion string // CPU Stat. - CPU *schedulerv1.CPU + CPU CPU // Memory Stat. - Memory *schedulerv1.Memory + Memory Memory // Network Stat. - Network *schedulerv1.Network + Network Network // Dist Stat. - Disk *schedulerv1.Disk + Disk Disk // Build information. - Build *schedulerv1.Build + Build Build // ConcurrentUploadLimit is concurrent upload limit count. ConcurrentUploadLimit *atomic.Int32 @@ -118,26 +197,158 @@ type Host struct { Log *logger.SugaredLoggerOnWith } +// CPU contains content for cpu. +type CPU struct { + // Number of logical cores in the system. + LogicalCount uint32 `csv:"logicalCount"` + + // Number of physical cores in the system. + PhysicalCount uint32 `csv:"physicalCount"` + + // Percent calculates the percentage of cpu used. + Percent float64 `csv:"percent"` + + // Calculates the percentage of cpu used by process. + ProcessPercent float64 `csv:"processPercent"` + + // Times contains the amounts of time the CPU has spent performing different kinds of work. + Times CPUTimes `csv:"times"` +} + +// CPUTimes contains content for cpu times. +type CPUTimes struct { + // CPU time of user. + User float64 `csv:"user"` + + // CPU time of system. + System float64 `csv:"system"` + + // CPU time of idle. + Idle float64 `csv:"idle"` + + // CPU time of nice. + Nice float64 `csv:"nice"` + + // CPU time of iowait. + Iowait float64 `csv:"iowait"` + + // CPU time of irq. + Irq float64 `csv:"irq"` + + // CPU time of softirq. + Softirq float64 `csv:"softirq"` + + // CPU time of steal. + Steal float64 `csv:"steal"` + + // CPU time of guest. + Guest float64 `csv:"guest"` + + // CPU time of guest nice. + GuestNice float64 `csv:"guestNice"` +} + +// Memory contains content for memory. +type Memory struct { + // Total amount of RAM on this system. + Total uint64 `csv:"total"` + + // RAM available for programs to allocate. + Available uint64 `csv:"available"` + + // RAM used by programs. + Used uint64 `csv:"used"` + + // Percentage of RAM used by programs. + UsedPercent float64 `csv:"usedPercent"` + + // Calculates the percentage of memory used by process. + ProcessUsedPercent float64 `csv:"processUsedPercent"` + + // This is the kernel's notion of free memory. + Free uint64 `csv:"free"` +} + +// Network contains content for network. +type Network struct { + // Return count of tcp connections opened and status is ESTABLISHED. + TCPConnectionCount uint32 `csv:"tcpConnectionCount"` + + // Return count of upload tcp connections opened and status is ESTABLISHED. + UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"` + + // Security domain for network. + SecurityDomain string `csv:"securityDomain"` + + // Location path(area|country|province|city|...). + Location string `csv:"location"` + + // IDC where the peer host is located + IDC string `csv:"idc"` +} + +// Build contains content for build. +type Build struct { + // Git version. + GitVersion string `csv:"gitVersion"` + + // Git commit. + GitCommit string `csv:"gitCommit"` + + // Golang version. + GoVersion string `csv:"goVersion"` + + // Build platform. + Platform string `csv:"platform"` +} + +// Disk contains content for disk. +type Disk struct { + // Total amount of disk on the data path of dragonfly. + Total uint64 `csv:"total"` + + // Free amount of disk on the data path of dragonfly. + Free uint64 `csv:"free"` + + // Used amount of disk on the data path of dragonfly. + Used uint64 `csv:"used"` + + // Used percent of disk on the data path of dragonfly directory. + UsedPercent float64 `csv:"usedPercent"` + + // Total amount of indoes on the data path of dragonfly directory. + InodesTotal uint64 `csv:"inodesTotal"` + + // Used amount of indoes on the data path of dragonfly directory. + InodesUsed uint64 `csv:"inodesUsed"` + + // Free amount of indoes on the data path of dragonfly directory. + InodesFree uint64 `csv:"inodesFree"` + + // Used percent of indoes on the data path of dragonfly directory. + InodesUsedPercent float64 `csv:"inodesUsedPercent"` +} + // New host instance. -func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host { +func NewHost( + id, ip, hostname string, + port, downloadPort int32, hostType types.HostType, + options ...HostOption, +) *Host { + // Calculate default of the concurrent upload limit by host type. + concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit + if hostType == types.HostTypeNormal { + concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit + } + h := &Host{ - ID: req.Id, - Type: types.ParseHostType(req.Type), - IP: req.Ip, - Hostname: req.Hostname, - Port: req.Port, - DownloadPort: req.DownloadPort, - OS: req.Os, - Platform: req.Platform, - PlatformFamily: req.PlatformFamily, - PlatformVersion: req.PlatformVersion, - KernelVersion: req.KernelVersion, - CPU: req.Cpu, - Memory: req.Memory, - Network: req.Network, - Disk: req.Disk, - Build: req.Build, - ConcurrentUploadLimit: atomic.NewInt32(config.DefaultPeerConcurrentUploadLimit), + ID: id, + Type: types.HostType(hostType), + IP: ip, + Hostname: hostname, + Port: port, + DownloadPort: downloadPort, + ConcurrentUploadLimit: atomic.NewInt32(int32(concurrentUploadLimit)), ConcurrentUploadCount: atomic.NewInt32(0), UploadCount: atomic.NewInt64(0), UploadFailedCount: atomic.NewInt64(0), @@ -145,7 +356,7 @@ func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host PeerCount: atomic.NewInt32(0), CreatedAt: atomic.NewTime(time.Now()), UpdatedAt: atomic.NewTime(time.Now()), - Log: logger.WithHost(req.Id, req.Hostname, req.Ip), + Log: logger.WithHost(id, hostname, ip), } for _, opt := range options { diff --git a/scheduler/resource/host_manager_test.go b/scheduler/resource/host_manager_test.go index da94c34e0..822fda829 100644 --- a/scheduler/resource/host_manager_test.go +++ b/scheduler/resource/host_manager_test.go @@ -131,7 +131,9 @@ func TestHostManager_Load(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) hostManager, err := newHostManager(mockHostGCConfig, gc) if err != nil { t.Fatal(err) @@ -184,7 +186,9 @@ func TestHostManager_Store(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) hostManager, err := newHostManager(mockHostGCConfig, gc) if err != nil { t.Fatal(err) @@ -235,7 +239,9 @@ func TestHostManager_LoadOrStore(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) hostManager, err := newHostManager(mockHostGCConfig, gc) if err != nil { t.Fatal(err) @@ -288,7 +294,9 @@ func TestHostManager_Delete(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) hostManager, err := newHostManager(mockHostGCConfig, gc) if err != nil { t.Fatal(err) @@ -363,7 +371,9 @@ func TestHostManager_RunGC(t *testing.T) { }, expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) { assert := assert.New(t) - mockSeedHost := NewHost(mockRawSeedHost) + mockSeedHost := NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) hostManager.Store(mockSeedHost) err := hostManager.RunGC() assert.NoError(err) @@ -382,7 +392,9 @@ func TestHostManager_RunGC(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) hostManager, err := newHostManager(mockHostGCConfig, gc) diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index 11d6e637f..4134aa6c1 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/assert" commonv1 "d7y.io/api/pkg/apis/common/v1" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" @@ -30,39 +29,103 @@ import ( ) var ( - mockRawHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname", 8003), - Type: types.HostTypeNormalName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", + mockRawHost = Host{ + ID: idgen.HostID("hostname", 8003), + Type: types.HostTypeNormal, + Hostname: "hostname", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockRawSeedHost = Host{ + ID: idgen.HostID("hostname_seed", 8003), + Type: types.HostTypeSuperSeed, + Hostname: "hostname_seed", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockCPU = CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, }, } - mockRawSeedHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname_seed", 8003), - Type: types.HostTypeSuperSeedName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname_seed", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", - }, + mockMemory = Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, + } + + mockNetwork = Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + SecurityDomain: "security_domain", + Location: "location", + IDC: "idc", + } + + mockDisk = Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, + } + + mockBuild = Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", } ) func TestHost_NewHost(t *testing.T) { tests := []struct { name string - rawHost *schedulerv1.AnnounceHostRequest + rawHost Host options []HostOption expect func(t *testing.T, host *Host) }{ @@ -71,19 +134,20 @@ func TestHost_NewHost(t *testing.T) { rawHost: mockRawHost, expect: func(t *testing.T, host *Host) { assert := assert.New(t) - assert.Equal(host.ID, mockRawHost.Id) + assert.Equal(host.ID, mockRawHost.ID) assert.Equal(host.Type, types.HostTypeNormal) - assert.Equal(host.IP, mockRawHost.Ip) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) - assert.Equal(host.Hostname, mockRawHost.Hostname) - assert.Equal(host.Network.SecurityDomain, mockRawHost.Network.SecurityDomain) - assert.Equal(host.Network.Location, mockRawHost.Network.Location) - assert.Equal(host.Network.Idc, mockRawHost.Network.Idc) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) assert.Equal(host.PeerCount.Load(), int32(0)) - assert.NotEqual(host.CreatedAt.Load(), 0) - assert.NotEqual(host.UpdatedAt.Load(), 0) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) assert.NotNil(host.Log) }, }, @@ -92,19 +156,20 @@ func TestHost_NewHost(t *testing.T) { rawHost: mockRawSeedHost, expect: func(t *testing.T, host *Host) { assert := assert.New(t) - assert.Equal(host.ID, mockRawSeedHost.Id) - assert.Equal(host.Type, types.HostTypeSuperSeed) - assert.Equal(host.IP, mockRawSeedHost.Ip) + assert.Equal(host.ID, mockRawSeedHost.ID) + assert.Equal(host.Type, mockRawSeedHost.Type) + assert.Equal(host.Hostname, mockRawSeedHost.Hostname) + assert.Equal(host.IP, mockRawSeedHost.IP) assert.Equal(host.Port, mockRawSeedHost.Port) assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort) - assert.Equal(host.Hostname, mockRawSeedHost.Hostname) - assert.Equal(host.Network.SecurityDomain, mockRawSeedHost.Network.SecurityDomain) - assert.Equal(host.Network.Location, mockRawSeedHost.Network.Location) - assert.Equal(host.Network.Idc, mockRawSeedHost.Network.Idc) - assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) assert.Equal(host.PeerCount.Load(), int32(0)) - assert.NotEqual(host.CreatedAt.Load(), 0) - assert.NotEqual(host.UpdatedAt.Load(), 0) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) assert.NotNil(host.Log) }, }, @@ -114,19 +179,260 @@ func TestHost_NewHost(t *testing.T) { options: []HostOption{WithConcurrentUploadLimit(200)}, expect: func(t *testing.T, host *Host) { assert := assert.New(t) - assert.Equal(host.ID, mockRawHost.Id) + assert.Equal(host.ID, mockRawHost.ID) assert.Equal(host.Type, types.HostTypeNormal) - assert.Equal(host.IP, mockRawHost.Ip) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) assert.Equal(host.Port, mockRawHost.Port) assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) - assert.Equal(host.Hostname, mockRawHost.Hostname) - assert.Equal(host.Network.SecurityDomain, mockRawHost.Network.SecurityDomain) - assert.Equal(host.Network.Location, mockRawHost.Network.Location) - assert.Equal(host.Network.Idc, mockRawHost.Network.Idc) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) assert.Equal(host.PeerCount.Load(), int32(0)) - assert.NotEqual(host.CreatedAt.Load(), 0) - assert.NotEqual(host.UpdatedAt.Load(), 0) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set os", + rawHost: mockRawHost, + options: []HostOption{WithOS("linux")}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.OS, "linux") + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set platform", + rawHost: mockRawHost, + options: []HostOption{WithPlatform("ubuntu")}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.Platform, "ubuntu") + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set platform family", + rawHost: mockRawHost, + options: []HostOption{WithPlatformFamily("debian")}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.PlatformFamily, "debian") + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set platform version", + rawHost: mockRawHost, + options: []HostOption{WithPlatformVersion("22.04")}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.PlatformVersion, "22.04") + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set kernel version", + rawHost: mockRawHost, + options: []HostOption{WithKernelVersion("5.15.0-27-generic")}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.Equal(host.KernelVersion, "5.15.0-27-generic") + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set cpu", + rawHost: mockRawHost, + options: []HostOption{WithCPU(mockCPU)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.EqualValues(host.CPU, mockCPU) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set memory", + rawHost: mockRawHost, + options: []HostOption{WithMemory(mockMemory)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.EqualValues(host.Memory, mockMemory) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set network", + rawHost: mockRawHost, + options: []HostOption{WithNetwork(mockNetwork)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.EqualValues(host.Network, mockNetwork) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set disk", + rawHost: mockRawHost, + options: []HostOption{WithDisk(mockDisk)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.EqualValues(host.Disk, mockDisk) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) + assert.NotNil(host.Log) + }, + }, + { + name: "new host and set build", + rawHost: mockRawHost, + options: []HostOption{WithBuild(mockBuild)}, + expect: func(t *testing.T, host *Host) { + assert := assert.New(t) + assert.Equal(host.ID, mockRawHost.ID) + assert.Equal(host.Type, types.HostTypeNormal) + assert.Equal(host.Hostname, mockRawHost.Hostname) + assert.Equal(host.IP, mockRawHost.IP) + assert.Equal(host.Port, mockRawHost.Port) + assert.Equal(host.DownloadPort, mockRawHost.DownloadPort) + assert.EqualValues(host.Build, mockBuild) + assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) + assert.Equal(host.ConcurrentUploadCount.Load(), int32(0)) + assert.Equal(host.UploadCount.Load(), int64(0)) + assert.Equal(host.UploadFailedCount.Load(), int64(0)) + assert.NotNil(host.Peers) + assert.Equal(host.PeerCount.Load(), int32(0)) + assert.NotEmpty(host.CreatedAt.Load()) + assert.NotEmpty(host.UpdatedAt.Load()) assert.NotNil(host.Log) }, }, @@ -134,7 +440,10 @@ func TestHost_NewHost(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - tc.expect(t, NewHost(tc.rawHost, tc.options...)) + tc.expect(t, NewHost( + tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, + tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type, + tc.options...)) }) } } @@ -142,9 +451,8 @@ func TestHost_NewHost(t *testing.T) { func TestHost_LoadPeer(t *testing.T) { tests := []struct { name string - rawHost *schedulerv1.AnnounceHostRequest + rawHost Host peerID string - options []HostOption expect func(t *testing.T, peer *Peer, loaded bool) }{ { @@ -179,7 +487,9 @@ func TestHost_LoadPeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := NewHost(tc.rawHost, tc.options...) + host := NewHost( + tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, + tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, host) @@ -193,7 +503,7 @@ func TestHost_LoadPeer(t *testing.T) { func TestHost_StorePeer(t *testing.T) { tests := []struct { name string - rawHost *schedulerv1.AnnounceHostRequest + rawHost Host peerID string options []HostOption expect func(t *testing.T, peer *Peer, loaded bool) @@ -222,7 +532,9 @@ func TestHost_StorePeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := NewHost(tc.rawHost, tc.options...) + host := NewHost( + tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, + tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(tc.peerID, mockTask, host) @@ -236,7 +548,7 @@ func TestHost_StorePeer(t *testing.T) { func TestHost_DeletePeer(t *testing.T) { tests := []struct { name string - rawHost *schedulerv1.AnnounceHostRequest + rawHost Host peerID string options []HostOption expect func(t *testing.T, host *Host) @@ -266,7 +578,9 @@ func TestHost_DeletePeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := NewHost(tc.rawHost, tc.options...) + host := NewHost( + tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, + tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, host) @@ -280,7 +594,7 @@ func TestHost_DeletePeer(t *testing.T) { func TestHost_LeavePeers(t *testing.T) { tests := []struct { name string - rawHost *schedulerv1.AnnounceHostRequest + rawHost Host options []HostOption expect func(t *testing.T, host *Host, mockPeer *Peer) }{ @@ -316,7 +630,9 @@ func TestHost_LeavePeers(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := NewHost(tc.rawHost, tc.options...) + host := NewHost( + tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, + tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, host) @@ -328,7 +644,7 @@ func TestHost_LeavePeers(t *testing.T) { func TestHost_FreeUploadCount(t *testing.T) { tests := []struct { name string - rawHost *schedulerv1.AnnounceHostRequest + rawHost Host options []HostOption expect func(t *testing.T, host *Host, mockTask *Task, mockPeer *Peer) }{ @@ -366,7 +682,9 @@ func TestHost_FreeUploadCount(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := NewHost(tc.rawHost, tc.options...) + host := NewHost( + tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname, + tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, host) diff --git a/scheduler/resource/peer_manager_test.go b/scheduler/resource/peer_manager_test.go index 0ec84d4b4..0b183f515 100644 --- a/scheduler/resource/peer_manager_test.go +++ b/scheduler/resource/peer_manager_test.go @@ -133,7 +133,9 @@ func TestPeerManager_Load(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) @@ -188,7 +190,9 @@ func TestPeerManager_Store(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) @@ -241,7 +245,9 @@ func TestPeerManager_LoadOrStore(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) @@ -296,7 +302,9 @@ func TestPeerManager_Delete(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(mockPeerGCConfig, gc) @@ -503,7 +511,9 @@ func TestPeerManager_RunGC(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) peerManager, err := newPeerManager(tc.gcConfig, gc) diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 428c06a66..c5f19ba2a 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -95,7 +95,9 @@ func TestPeer_NewPeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) tc.expect(t, NewPeer(tc.id, mockTask, mockHost, tc.options...), mockTask, mockHost) }) @@ -128,7 +130,9 @@ func TestPeer_AppendPieceCost(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) @@ -163,7 +167,9 @@ func TestPeer_PieceCosts(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) @@ -203,7 +209,9 @@ func TestPeer_LoadStream(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) @@ -234,7 +242,9 @@ func TestPeer_StoreStream(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) @@ -265,7 +275,9 @@ func TestPeer_DeleteStream(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer, stream) @@ -308,7 +320,9 @@ func TestPeer_Parents(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost) @@ -352,7 +366,9 @@ func TestPeer_Children(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost) @@ -449,9 +465,11 @@ func TestPeer_DownloadTinyFile(t *testing.T) { t.Fatal(err) } - mockRawHost.Ip = ip + mockRawHost.IP = ip mockRawHost.DownloadPort = int32(port) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer = NewPeer(mockPeerID, mockTask, mockHost) tc.expect(t, peer) @@ -565,7 +583,9 @@ func TestPeer_GetPriority(t *testing.T) { defer ctl.Finish() dynconfig := configmocks.NewMockDynconfigInterface(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := NewPeer(mockPeerID, mockTask, mockHost) tc.mock(peer, dynconfig.EXPECT()) diff --git a/scheduler/resource/seed_peer_client.go b/scheduler/resource/seed_peer_client.go index 8b8e93f1d..06df5ea65 100644 --- a/scheduler/resource/seed_peer_client.go +++ b/scheduler/resource/seed_peer_client.go @@ -26,7 +26,6 @@ import ( "google.golang.org/grpc" managerv1 "d7y.io/api/pkg/apis/manager/v1" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/dfnet" @@ -125,35 +124,29 @@ func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv1.S id := idgen.HostID(seedPeer.HostName, seedPeer.Port) seedPeerHost, loaded := sc.hostManager.Load(id) if !loaded { - var options []HostOption + options := []HostOption{WithNetwork(Network{ + Location: seedPeer.Location, + IDC: seedPeer.Idc, + })} if concurrentUploadLimit > 0 { options = append(options, WithConcurrentUploadLimit(concurrentUploadLimit)) } - sc.hostManager.Store(NewHost(&schedulerv1.AnnounceHostRequest{ - Id: id, - Type: types.HostTypeSuperSeedName, - Ip: seedPeer.Ip, - Hostname: seedPeer.HostName, - Port: seedPeer.Port, - DownloadPort: seedPeer.DownloadPort, - Network: &schedulerv1.Network{ - Location: seedPeer.Location, - Idc: seedPeer.Idc, - }, - }, options...)) + host := NewHost( + id, seedPeer.Ip, seedPeer.HostName, + seedPeer.Port, seedPeer.DownloadPort, types.HostTypeSuperSeed, + options..., + ) + + sc.hostManager.Store(host) continue } - seedPeerHost.IP = seedPeer.Ip seedPeerHost.Type = types.HostTypeSuperSeed - seedPeerHost.Hostname = seedPeer.HostName - seedPeerHost.Port = seedPeer.Port + seedPeerHost.IP = seedPeer.Ip seedPeerHost.DownloadPort = seedPeer.DownloadPort - seedPeerHost.Network = &schedulerv1.Network{ - Location: seedPeer.Location, - Idc: seedPeer.Idc, - } + seedPeerHost.Network.Location = seedPeer.Location + seedPeerHost.Network.IDC = seedPeer.Idc if concurrentUploadLimit > 0 { seedPeerHost.ConcurrentUploadLimit.Store(concurrentUploadLimit) diff --git a/scheduler/resource/seed_peer_client_test.go b/scheduler/resource/seed_peer_client_test.go index e96208f6a..ab085b9c5 100644 --- a/scheduler/resource/seed_peer_client_test.go +++ b/scheduler/resource/seed_peer_client_test.go @@ -163,7 +163,9 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { }, }, mock: func(dynconfig *configmocks.MockDynconfigInterfaceMockRecorder, hostManager *MockHostManagerMockRecorder) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) gomock.InOrder( dynconfig.Get().Return(&config.DynconfigData{ Scheduler: &managerv1.Scheduler{ @@ -288,17 +290,17 @@ func TestSeedPeerClient_seedPeersToNetAddrs(t *testing.T) { Id: 1, Type: pkgtypes.HostTypeSuperSeedName, HostName: mockRawSeedHost.Hostname, - Ip: mockRawSeedHost.Ip, + Ip: mockRawSeedHost.IP, Port: mockRawSeedHost.Port, DownloadPort: mockRawSeedHost.DownloadPort, - Idc: mockRawSeedHost.Network.Idc, + Idc: mockRawSeedHost.Network.IDC, Location: mockRawSeedHost.Network.Location, }, }, expect: func(t *testing.T, netAddrs []dfnet.NetAddr) { assert := assert.New(t) assert.Equal(netAddrs[0].Type, dfnet.TCP) - assert.Equal(netAddrs[0].Addr, fmt.Sprintf("%s:%d", mockRawSeedHost.Ip, mockRawSeedHost.Port)) + assert.Equal(netAddrs[0].Addr, fmt.Sprintf("%s:%d", mockRawSeedHost.IP, mockRawSeedHost.Port)) }, }, { diff --git a/scheduler/resource/task_manager_test.go b/scheduler/resource/task_manager_test.go index 7a7955012..95c19e7ce 100644 --- a/scheduler/resource/task_manager_test.go +++ b/scheduler/resource/task_manager_test.go @@ -353,7 +353,9 @@ func TestTaskManager_RunGC(t *testing.T) { gc := gc.NewMockGC(ctl) tc.mock(gc.EXPECT()) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, mockTask, mockHost) taskManager, err := newTaskManager(mockTaskGCConfig, gc) diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index 681c284e9..604d35fa5 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -148,7 +148,9 @@ func TestTask_LoadPeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(tc.id, tc.url, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit)) mockPeer := NewPeer(mockPeerID, task, mockHost) @@ -215,7 +217,9 @@ func TestTask_LoadRandomPeers(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := NewHost(mockRawHost) + host := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, task, host) @@ -263,7 +267,9 @@ func TestTask_StorePeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(tc.id, tc.url, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit)) mockPeer := NewPeer(tc.peerID, task, mockHost) @@ -315,7 +321,9 @@ func TestTask_DeletePeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(tc.id, tc.url, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit)) mockPeer := NewPeer(mockPeerID, task, mockHost) @@ -352,7 +360,9 @@ func TestTask_PeerCount(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) mockPeer := NewPeer(mockPeerID, task, mockHost) @@ -448,7 +458,9 @@ func TestTask_AddPeerEdge(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -550,7 +562,9 @@ func TestTask_DeletePeerInEdges(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -650,7 +664,9 @@ func TestTask_DeletePeerOutEdges(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -735,7 +751,9 @@ func TestTask_CanAddPeerEdge(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -796,7 +814,9 @@ func TestTask_PeerDegree(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -857,7 +877,9 @@ func TestTask_PeerInDegree(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -918,7 +940,9 @@ func TestTask_PeerOutDegree(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta) tc.expect(t, mockHost, task) @@ -1026,7 +1050,9 @@ func TestTask_HasAvailablePeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit)) mockPeer := NewPeer(mockPeerID, task, mockHost) @@ -1088,8 +1114,12 @@ func TestTask_LoadSeedPeer(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) - mockSeedHost := NewHost(mockRawSeedHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockSeedHost := NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, task, mockHost) mockSeedPeer := NewPeer(mockSeedPeerID, task, mockSeedHost) @@ -1151,8 +1181,12 @@ func TestTask_IsSeedPeerFailed(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := NewHost(mockRawHost) - mockSeedHost := NewHost(mockRawSeedHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockSeedHost := NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, task, mockHost) mockSeedPeer := NewPeer(mockSeedPeerID, task, mockSeedHost) @@ -1622,7 +1656,9 @@ func TestTask_NotifyPeers(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - mockHost := NewHost(mockRawHost) + mockHost := NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := NewPeer(mockPeerID, task, mockHost) task.StorePeer(mockPeer) diff --git a/scheduler/scheduler/evaluator/evaluator_base.go b/scheduler/scheduler/evaluator/evaluator_base.go index 02c2f175f..1f57f5e0f 100644 --- a/scheduler/scheduler/evaluator/evaluator_base.go +++ b/scheduler/scheduler/evaluator/evaluator_base.go @@ -77,31 +77,18 @@ func NewEvaluatorBase() Evaluator { // The larger the value after evaluation, the higher the priority. func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 { - var ( - parentSecurityDomain string - childSecurityDomain string - parentLocation string - childLocation string - parentIDC string - childIDC string - ) - if parent.Host.Network != nil { - parentSecurityDomain = parent.Host.Network.SecurityDomain - parentLocation = parent.Host.Network.Location - parentIDC = parent.Host.Network.Idc - } - - if child.Host.Network != nil { - childSecurityDomain = child.Host.Network.SecurityDomain - childLocation = child.Host.Network.Location - childIDC = child.Host.Network.Idc - } + parentSecurityDomain := parent.Host.Network.SecurityDomain + parentLocation := parent.Host.Network.Location + parentIDC := parent.Host.Network.IDC + childSecurityDomain := child.Host.Network.SecurityDomain + childLocation := child.Host.Network.Location + childIDC := child.Host.Network.IDC // If the SecurityDomain of hosts exists but is not equal, // it cannot be scheduled as a parent. if parentSecurityDomain != "" && childSecurityDomain != "" && - parent.Host.Network.SecurityDomain != child.Host.Network.SecurityDomain { + parentSecurityDomain != childSecurityDomain { return minScore } diff --git a/scheduler/scheduler/evaluator/evaluator_base_test.go b/scheduler/scheduler/evaluator/evaluator_base_test.go index af912af9a..cdadafcd5 100644 --- a/scheduler/scheduler/evaluator/evaluator_base_test.go +++ b/scheduler/scheduler/evaluator/evaluator_base_test.go @@ -23,7 +23,6 @@ import ( "github.com/stretchr/testify/assert" commonv1 "d7y.io/api/pkg/apis/common/v1" - schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/types" @@ -31,32 +30,96 @@ import ( ) var ( - mockRawSeedHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname", 8003), - Type: types.HostTypeSuperSeedName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", + mockRawHost = resource.Host{ + ID: idgen.HostID("hostname", 8003), + Type: types.HostTypeNormal, + Hostname: "hostname", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockRawSeedHost = resource.Host{ + ID: idgen.HostID("hostname_seed", 8003), + Type: types.HostTypeSuperSeed, + Hostname: "hostname_seed", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockCPU = resource.CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: resource.CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, }, } - mockRawHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname", 8003), - Type: types.HostTypeNormalName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", - }, + mockMemory = resource.Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, + } + + mockNetwork = resource.Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + SecurityDomain: "security_domain", + Location: "location", + IDC: "idc", + } + + mockDisk = resource.Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, + } + + mockBuild = resource.Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", } mockTaskURLMeta = &commonv1.UrlMeta{ @@ -109,10 +172,14 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { name: "security domain is not the same", parent: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawSeedHost)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawHost)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, mock: func(parent *resource.Peer, child *resource.Peer) { parent.Host.Network.SecurityDomain = "foo" @@ -127,10 +194,14 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { name: "security domain is same", parent: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawSeedHost)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawHost)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, mock: func(parent *resource.Peer, child *resource.Peer) { parent.Host.Network.SecurityDomain = "bac" @@ -139,17 +210,21 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { }, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(0.8500000000000001)) + assert.Equal(score, float64(0.55)) }, }, { name: "parent security domain is empty", parent: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawSeedHost)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawHost)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, mock: func(parent *resource.Peer, child *resource.Peer) { parent.Host.Network.SecurityDomain = "" @@ -158,17 +233,21 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { }, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(0.8500000000000001)) + assert.Equal(score, float64(0.55)) }, }, { name: "child security domain is empty", parent: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawSeedHost)), + resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)), child: resource.NewPeer(idgen.PeerID("127.0.0.1"), resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)), - resource.NewHost(mockRawHost)), + resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)), totalPieceCount: 1, mock: func(parent *resource.Peer, child *resource.Peer) { parent.Host.Network.SecurityDomain = "baz" @@ -177,7 +256,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { }, expect: func(t *testing.T, score float64) { assert := assert.New(t) - assert.Equal(score, float64(0.8500000000000001)) + assert.Equal(score, float64(0.55)) }, }, } @@ -192,7 +271,9 @@ func TestEvaluatorBase_Evaluate(t *testing.T) { } func TestEvaluatorBase_calculatePieceScore(t *testing.T) { - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) tests := []struct { @@ -351,7 +432,9 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := resource.NewHost(mockRawHost) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, host) tc.mock(host) @@ -388,7 +471,9 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - host := resource.NewHost(mockRawHost) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, host) tc.mock(host, mockPeer) @@ -437,7 +522,9 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(peer) @@ -455,8 +542,8 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) { { name: "idc is empty", mock: func(dstHost *resource.Host, srcHost *resource.Host) { - dstHost.Network.Idc = "" - srcHost.Network.Idc = "" + dstHost.Network.IDC = "" + srcHost.Network.IDC = "" }, expect: func(t *testing.T, score float64) { assert := assert.New(t) @@ -466,7 +553,7 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) { { name: "dst host idc is empty", mock: func(dstHost *resource.Host, srcHost *resource.Host) { - dstHost.Network.Idc = "" + dstHost.Network.IDC = "" }, expect: func(t *testing.T, score float64) { assert := assert.New(t) @@ -476,7 +563,7 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) { { name: "src host idc is empty", mock: func(dstHost *resource.Host, srcHost *resource.Host) { - srcHost.Network.Idc = "" + srcHost.Network.IDC = "" }, expect: func(t *testing.T, score float64) { assert := assert.New(t) @@ -486,8 +573,8 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) { { name: "idc is not the same", mock: func(dstHost *resource.Host, srcHost *resource.Host) { - dstHost.Network.Idc = "foo" - srcHost.Network.Idc = "bar" + dstHost.Network.IDC = "foo" + srcHost.Network.IDC = "bar" }, expect: func(t *testing.T, score float64) { assert := assert.New(t) @@ -497,8 +584,8 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) { { name: "idc is the same", mock: func(dstHost *resource.Host, srcHost *resource.Host) { - dstHost.Network.Idc = "example" - srcHost.Network.Idc = "example" + dstHost.Network.IDC = "example" + srcHost.Network.IDC = "example" }, expect: func(t *testing.T, score float64) { assert := assert.New(t) @@ -509,10 +596,14 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - dstHost := resource.NewHost(mockRawHost) - srcHost := resource.NewHost(mockRawSeedHost) + dstHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + srcHost := resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) tc.mock(dstHost, srcHost) - tc.expect(t, calculateIDCAffinityScore(dstHost.Network.Idc, srcHost.Network.Idc)) + tc.expect(t, calculateIDCAffinityScore(dstHost.Network.IDC, srcHost.Network.IDC)) }) } } @@ -642,7 +733,9 @@ func TestEvaluatorBase_calculateMultiElementAffinityScore(t *testing.T) { } func TestEvaluatorBase_IsBadNode(t *testing.T) { - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) tests := []struct { diff --git a/scheduler/scheduler/scheduler_test.go b/scheduler/scheduler/scheduler_test.go index 0207b6a0c..ee7221e45 100644 --- a/scheduler/scheduler/scheduler_test.go +++ b/scheduler/scheduler/scheduler_test.go @@ -52,32 +52,96 @@ var ( Algorithm: evaluator.DefaultAlgorithm, } - mockRawHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname", 8003), - Type: pkgtypes.HostTypeNormalName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", + mockRawHost = resource.Host{ + ID: idgen.HostID("hostname", 8003), + Type: pkgtypes.HostTypeNormal, + Hostname: "hostname", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockRawSeedHost = resource.Host{ + ID: idgen.HostID("hostname_seed", 8003), + Type: pkgtypes.HostTypeSuperSeed, + Hostname: "hostname_seed", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockCPU = resource.CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: resource.CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, }, } - mockRawSeedHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname_seed", 8003), - Type: pkgtypes.HostTypeSuperSeedName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", - }, + mockMemory = resource.Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, + } + + mockNetwork = resource.Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + SecurityDomain: "security_domain", + Location: "location", + IDC: "idc", + } + + mockDisk = resource.Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, + } + + mockBuild = resource.Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", } mockTaskURLMeta = &commonv1.UrlMeta{ @@ -316,10 +380,14 @@ func TestScheduler_ScheduleParent(t *testing.T) { stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) ctx, cancel := context.WithCancel(context.Background()) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - mockSeedHost := resource.NewHost(mockRawSeedHost) + mockSeedHost := resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost) blocklist := set.NewSafeSet[string]() @@ -556,18 +624,9 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { peer.FSM.SetState(resource.PeerStateRunning) mockPeer.FSM.SetState(resource.PeerStateRunning) - candidatePeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(&schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID(uuid.New().String(), 8003), - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", - }, - })) + candidatePeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost( + idgen.HostID(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)) candidatePeer.FSM.SetState(resource.PeerStateRunning) peer.Task.StorePeer(peer) peer.Task.StorePeer(mockPeer) @@ -600,21 +659,14 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) { defer ctl.Finish() stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(&schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID(uuid.New().String(), 8003), - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", - }, - })) + mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost( + idgen.HostID(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)) blocklist := set.NewSafeSet[string]() tc.mock(peer, mockTask, mockPeer, blocklist, stream, dynconfig, stream.EXPECT(), dynconfig.EXPECT()) @@ -878,24 +930,17 @@ func TestScheduler_FindParent(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() dynconfig := configmocks.NewMockDynconfigInterface(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) var mockPeers []*resource.Peer for i := 0; i < 11; i++ { - mockHost := resource.NewHost(&schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID(uuid.New().String(), 8003), - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Network: &schedulerv1.Network{ - SecurityDomain: "security_domain", - Location: "location", - Idc: "idc", - }, - }) + mockHost := resource.NewHost( + idgen.HostID(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) peer := resource.NewPeer(idgen.PeerID(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost) mockPeers = append(mockPeers, peer) } @@ -978,7 +1023,9 @@ func TestScheduler_constructSuccessPeerPacket(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() dynconfig := configmocks.NewMockDynconfigInterface(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index b80c1a4a3..f33b3c44a 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -440,32 +440,164 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ host, loaded := v.resource.HostManager().Load(req.Id) if !loaded { - var options []resource.HostOption + options := []resource.HostOption{ + resource.WithOS(req.Os), + resource.WithPlatform(req.Platform), + resource.WithPlatformFamily(req.PlatformFamily), + resource.WithPlatformVersion(req.PlatformVersion), + resource.WithKernelVersion(req.KernelVersion), + } + if concurrentUploadLimit > 0 { options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit)) } - host = resource.NewHost(req, options...) + if req.Cpu != nil { + options = append(options, resource.WithCPU(resource.CPU{ + LogicalCount: req.Cpu.LogicalCount, + PhysicalCount: req.Cpu.PhysicalCount, + Percent: req.Cpu.Percent, + ProcessPercent: req.Cpu.ProcessPercent, + Times: resource.CPUTimes{ + User: req.Cpu.Times.User, + System: req.Cpu.Times.System, + Idle: req.Cpu.Times.Idle, + Nice: req.Cpu.Times.Nice, + Iowait: req.Cpu.Times.Iowait, + Irq: req.Cpu.Times.Irq, + Softirq: req.Cpu.Times.Softirq, + Steal: req.Cpu.Times.Steal, + Guest: req.Cpu.Times.Guest, + GuestNice: req.Cpu.Times.GuestNice, + }, + })) + } + + if req.Memory != nil { + options = append(options, resource.WithMemory(resource.Memory{ + Total: req.Memory.Total, + Available: req.Memory.Available, + Used: req.Memory.Used, + UsedPercent: req.Memory.UsedPercent, + ProcessUsedPercent: req.Memory.ProcessUsedPercent, + Free: req.Memory.Free, + })) + } + + if req.Network != nil { + options = append(options, resource.WithNetwork(resource.Network{ + TCPConnectionCount: req.Network.TcpConnectionCount, + UploadTCPConnectionCount: req.Network.UploadTcpConnectionCount, + SecurityDomain: req.Network.SecurityDomain, + Location: req.Network.Location, + IDC: req.Network.Idc, + })) + } + + if req.Disk != nil { + options = append(options, resource.WithDisk(resource.Disk{ + Total: req.Disk.Total, + Free: req.Disk.Free, + Used: req.Disk.Used, + UsedPercent: req.Disk.UsedPercent, + InodesTotal: req.Disk.InodesTotal, + InodesUsed: req.Disk.InodesUsed, + InodesFree: req.Disk.InodesFree, + InodesUsedPercent: req.Disk.InodesUsedPercent, + })) + } + + if req.Build != nil { + options = append(options, resource.WithBuild(resource.Build{ + GitVersion: req.Build.GitVersion, + GitCommit: req.Build.GitCommit, + GoVersion: req.Build.GoVersion, + Platform: req.Build.Platform, + })) + } + + host = resource.NewHost( + req.Id, req.Ip, req.Hostname, + req.Port, req.DownloadPort, types.ParseHostType(req.Type), + options..., + ) v.resource.HostManager().Store(host) host.Log.Infof("announce new host: %#v", req) return nil } - host.Type = types.ParseHostType(req.Type) host.IP = req.Ip - host.Hostname = req.Hostname - host.Port = req.Port host.DownloadPort = req.DownloadPort + host.Type = types.ParseHostType(req.Type) host.OS = req.Os host.Platform = req.Platform host.PlatformFamily = req.PlatformFamily host.PlatformVersion = req.PlatformVersion host.KernelVersion = req.KernelVersion - host.CPU = req.Cpu - host.Memory = req.Memory - host.Network = req.Network - host.Disk = req.Disk - host.Build = req.Build + + if req.Cpu != nil { + host.CPU = resource.CPU{ + LogicalCount: req.Cpu.LogicalCount, + PhysicalCount: req.Cpu.PhysicalCount, + Percent: req.Cpu.Percent, + ProcessPercent: req.Cpu.ProcessPercent, + Times: resource.CPUTimes{ + User: req.Cpu.Times.User, + System: req.Cpu.Times.System, + Idle: req.Cpu.Times.Idle, + Nice: req.Cpu.Times.Nice, + Iowait: req.Cpu.Times.Iowait, + Irq: req.Cpu.Times.Irq, + Softirq: req.Cpu.Times.Softirq, + Steal: req.Cpu.Times.Steal, + Guest: req.Cpu.Times.Guest, + GuestNice: req.Cpu.Times.GuestNice, + }, + } + } + + if req.Memory != nil { + host.Memory = resource.Memory{ + Total: req.Memory.Total, + Available: req.Memory.Available, + Used: req.Memory.Used, + UsedPercent: req.Memory.UsedPercent, + ProcessUsedPercent: req.Memory.ProcessUsedPercent, + Free: req.Memory.Free, + } + } + + if req.Network != nil { + host.Network = resource.Network{ + TCPConnectionCount: req.Network.TcpConnectionCount, + UploadTCPConnectionCount: req.Network.UploadTcpConnectionCount, + SecurityDomain: req.Network.SecurityDomain, + Location: req.Network.Location, + IDC: req.Network.Idc, + } + } + + if req.Disk != nil { + host.Disk = resource.Disk{ + Total: req.Disk.Total, + Free: req.Disk.Free, + Used: req.Disk.Used, + UsedPercent: req.Disk.UsedPercent, + InodesTotal: req.Disk.InodesTotal, + InodesUsed: req.Disk.InodesUsed, + InodesFree: req.Disk.InodesFree, + InodesUsedPercent: req.Disk.InodesUsedPercent, + } + } + + if req.Build != nil { + host.Build = resource.Build{ + GitVersion: req.Build.GitVersion, + GitCommit: req.Build.GitCommit, + GoVersion: req.Build.GoVersion, + Platform: req.Build.Platform, + } + } if concurrentUploadLimit > 0 { host.ConcurrentUploadLimit.Store(concurrentUploadLimit) @@ -597,24 +729,21 @@ func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *res host, loaded := v.resource.HostManager().Load(peerHost.Id) if !loaded { // Get scheduler cluster client config by manager. - var options []resource.HostOption + options := []resource.HostOption{resource.WithNetwork(resource.Network{ + SecurityDomain: peerHost.SecurityDomain, + Location: peerHost.Location, + IDC: peerHost.Idc, + })} if clientConfig, err := v.dynconfig.GetSchedulerClusterClientConfig(); err == nil && clientConfig.LoadLimit > 0 { options = append(options, resource.WithConcurrentUploadLimit(int32(clientConfig.LoadLimit))) } - host = resource.NewHost(&schedulerv1.AnnounceHostRequest{ - Id: peerHost.Id, - Type: types.HostTypeNormalName, - Ip: peerHost.Ip, - Hostname: peerHost.HostName, - Port: peerHost.RpcPort, - DownloadPort: peerHost.DownPort, - Network: &schedulerv1.Network{ - SecurityDomain: peerHost.SecurityDomain, - Location: peerHost.Location, - Idc: peerHost.Idc, - }, - }, options...) + host = resource.NewHost( + peerHost.Id, peerHost.Ip, peerHost.HostName, + peerHost.RpcPort, peerHost.DownPort, types.HostTypeNormal, + options..., + ) + v.resource.HostManager().Store(host) host.Log.Info("create new host") return host @@ -1096,68 +1225,58 @@ func (v *V1) createRecord(peer *resource.Peer, parents []*resource.Peer, req *sc }, } - if parent.Host.CPU != nil { - parentRecord.Host.CPU = storage.CPU{ - LogicalCount: parent.Host.CPU.LogicalCount, - PhysicalCount: parent.Host.CPU.PhysicalCount, - Percent: parent.Host.CPU.Percent, - ProcessPercent: parent.Host.CPU.ProcessPercent, - Times: storage.CPUTimes{ - User: parent.Host.CPU.Times.User, - System: parent.Host.CPU.Times.System, - Idle: parent.Host.CPU.Times.Idle, - Nice: parent.Host.CPU.Times.Nice, - Iowait: parent.Host.CPU.Times.Iowait, - Irq: parent.Host.CPU.Times.Irq, - Softirq: parent.Host.CPU.Times.Softirq, - Steal: parent.Host.CPU.Times.Steal, - Guest: parent.Host.CPU.Times.Guest, - GuestNice: parent.Host.CPU.Times.GuestNice, - }, - } + parentRecord.Host.CPU = resource.CPU{ + LogicalCount: parent.Host.CPU.LogicalCount, + PhysicalCount: parent.Host.CPU.PhysicalCount, + Percent: parent.Host.CPU.Percent, + ProcessPercent: parent.Host.CPU.ProcessPercent, + Times: resource.CPUTimes{ + User: parent.Host.CPU.Times.User, + System: parent.Host.CPU.Times.System, + Idle: parent.Host.CPU.Times.Idle, + Nice: parent.Host.CPU.Times.Nice, + Iowait: parent.Host.CPU.Times.Iowait, + Irq: parent.Host.CPU.Times.Irq, + Softirq: parent.Host.CPU.Times.Softirq, + Steal: parent.Host.CPU.Times.Steal, + Guest: parent.Host.CPU.Times.Guest, + GuestNice: parent.Host.CPU.Times.GuestNice, + }, } - if parent.Host.Memory != nil { - parentRecord.Host.Memory = storage.Memory{ - Total: parent.Host.Memory.Total, - Available: parent.Host.Memory.Available, - Used: parent.Host.Memory.Used, - UsedPercent: parent.Host.Memory.UsedPercent, - ProcessUsedPercent: parent.Host.Memory.ProcessUsedPercent, - Free: parent.Host.Memory.Free, - } + parentRecord.Host.Memory = resource.Memory{ + Total: parent.Host.Memory.Total, + Available: parent.Host.Memory.Available, + Used: parent.Host.Memory.Used, + UsedPercent: parent.Host.Memory.UsedPercent, + ProcessUsedPercent: parent.Host.Memory.ProcessUsedPercent, + Free: parent.Host.Memory.Free, } - if parent.Host.Network != nil { - parentRecord.Host.Network = storage.Network{ - TCPConnectionCount: parent.Host.Network.TcpConnectionCount, - UploadTCPConnectionCount: parent.Host.Network.UploadTcpConnectionCount, - SecurityDomain: parent.Host.Network.SecurityDomain, - Location: parent.Host.Network.Location, - IDC: parent.Host.Network.Idc, - } + parentRecord.Host.Network = resource.Network{ + TCPConnectionCount: parent.Host.Network.TCPConnectionCount, + UploadTCPConnectionCount: parent.Host.Network.UploadTCPConnectionCount, + SecurityDomain: parent.Host.Network.SecurityDomain, + Location: parent.Host.Network.Location, + IDC: parent.Host.Network.IDC, } - if parent.Host.Disk != nil { - parentRecord.Host.Disk = storage.Disk{ - Total: parent.Host.Disk.Total, - Free: parent.Host.Disk.Free, - Used: parent.Host.Disk.Used, - UsedPercent: parent.Host.Disk.UsedPercent, - InodesTotal: parent.Host.Disk.InodesTotal, - InodesUsed: parent.Host.Disk.InodesUsed, - InodesFree: parent.Host.Disk.InodesFree, - InodesUsedPercent: parent.Host.Disk.InodesUsedPercent, - } + parentRecord.Host.Disk = resource.Disk{ + Total: parent.Host.Disk.Total, + Free: parent.Host.Disk.Free, + Used: parent.Host.Disk.Used, + UsedPercent: parent.Host.Disk.UsedPercent, + InodesTotal: parent.Host.Disk.InodesTotal, + InodesUsed: parent.Host.Disk.InodesUsed, + InodesFree: parent.Host.Disk.InodesFree, + InodesUsedPercent: parent.Host.Disk.InodesUsedPercent, } - if parent.Host.Build != nil { - parentRecord.Host.Build = storage.Build{ - GitVersion: parent.Host.Build.GitVersion, - GitCommit: parent.Host.Build.GitCommit, - GoVersion: parent.Host.Build.GoVersion, - Platform: parent.Host.Build.Platform, - } + parentRecord.Host.Build = resource.Build{ + GitVersion: parent.Host.Build.GitVersion, + GitCommit: parent.Host.Build.GitCommit, + GoVersion: parent.Host.Build.GoVersion, + Platform: parent.Host.Build.Platform, } for _, piece := range peer.Pieces.Values() { @@ -1211,68 +1330,58 @@ func (v *V1) createRecord(peer *resource.Peer, parents []*resource.Peer, req *sc }, } - if peer.Host.CPU != nil { - record.Host.CPU = storage.CPU{ - LogicalCount: peer.Host.CPU.LogicalCount, - PhysicalCount: peer.Host.CPU.PhysicalCount, - Percent: peer.Host.CPU.Percent, - ProcessPercent: peer.Host.CPU.ProcessPercent, - Times: storage.CPUTimes{ - User: peer.Host.CPU.Times.User, - System: peer.Host.CPU.Times.System, - Idle: peer.Host.CPU.Times.Idle, - Nice: peer.Host.CPU.Times.Nice, - Iowait: peer.Host.CPU.Times.Iowait, - Irq: peer.Host.CPU.Times.Irq, - Softirq: peer.Host.CPU.Times.Softirq, - Steal: peer.Host.CPU.Times.Steal, - Guest: peer.Host.CPU.Times.Guest, - GuestNice: peer.Host.CPU.Times.GuestNice, - }, - } + record.Host.CPU = resource.CPU{ + LogicalCount: peer.Host.CPU.LogicalCount, + PhysicalCount: peer.Host.CPU.PhysicalCount, + Percent: peer.Host.CPU.Percent, + ProcessPercent: peer.Host.CPU.ProcessPercent, + Times: resource.CPUTimes{ + User: peer.Host.CPU.Times.User, + System: peer.Host.CPU.Times.System, + Idle: peer.Host.CPU.Times.Idle, + Nice: peer.Host.CPU.Times.Nice, + Iowait: peer.Host.CPU.Times.Iowait, + Irq: peer.Host.CPU.Times.Irq, + Softirq: peer.Host.CPU.Times.Softirq, + Steal: peer.Host.CPU.Times.Steal, + Guest: peer.Host.CPU.Times.Guest, + GuestNice: peer.Host.CPU.Times.GuestNice, + }, } - if peer.Host.Memory != nil { - record.Host.Memory = storage.Memory{ - Total: peer.Host.Memory.Total, - Available: peer.Host.Memory.Available, - Used: peer.Host.Memory.Used, - UsedPercent: peer.Host.Memory.UsedPercent, - ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent, - Free: peer.Host.Memory.Free, - } + record.Host.Memory = resource.Memory{ + Total: peer.Host.Memory.Total, + Available: peer.Host.Memory.Available, + Used: peer.Host.Memory.Used, + UsedPercent: peer.Host.Memory.UsedPercent, + ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent, + Free: peer.Host.Memory.Free, } - if peer.Host.Network != nil { - record.Host.Network = storage.Network{ - TCPConnectionCount: peer.Host.Network.TcpConnectionCount, - UploadTCPConnectionCount: peer.Host.Network.UploadTcpConnectionCount, - SecurityDomain: peer.Host.Network.SecurityDomain, - Location: peer.Host.Network.Location, - IDC: peer.Host.Network.Idc, - } + record.Host.Network = resource.Network{ + TCPConnectionCount: peer.Host.Network.TCPConnectionCount, + UploadTCPConnectionCount: peer.Host.Network.UploadTCPConnectionCount, + SecurityDomain: peer.Host.Network.SecurityDomain, + Location: peer.Host.Network.Location, + IDC: peer.Host.Network.IDC, } - if peer.Host.Disk != nil { - record.Host.Disk = storage.Disk{ - Total: peer.Host.Disk.Total, - Free: peer.Host.Disk.Free, - Used: peer.Host.Disk.Used, - UsedPercent: peer.Host.Disk.UsedPercent, - InodesTotal: peer.Host.Disk.InodesTotal, - InodesUsed: peer.Host.Disk.InodesUsed, - InodesFree: peer.Host.Disk.InodesFree, - InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, - } + record.Host.Disk = resource.Disk{ + Total: peer.Host.Disk.Total, + Free: peer.Host.Disk.Free, + Used: peer.Host.Disk.Used, + UsedPercent: peer.Host.Disk.UsedPercent, + InodesTotal: peer.Host.Disk.InodesTotal, + InodesUsed: peer.Host.Disk.InodesUsed, + InodesFree: peer.Host.Disk.InodesFree, + InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, } - if peer.Host.Build != nil { - record.Host.Build = storage.Build{ - GitVersion: peer.Host.Build.GitVersion, - GitCommit: peer.Host.Build.GitCommit, - GoVersion: peer.Host.Build.GoVersion, - Platform: peer.Host.Build.Platform, - } + record.Host.Build = resource.Build{ + GitVersion: peer.Host.Build.GitVersion, + GitCommit: peer.Host.Build.GitCommit, + GoVersion: peer.Host.Build.GoVersion, + Platform: peer.Host.Build.Platform, } if req.Code != commonv1.Code_Success { diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 572ca3100..172d5f1ba 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -64,86 +64,96 @@ var ( BackToSourceCount: int(mockTaskBackToSourceLimit), } - mockHostCPU = &schedulerv1.CPU{ - LogicalCount: 24, - PhysicalCount: 12, - Percent: 0.8, - ProcessPercent: 0.4, - Times: &schedulerv1.CPUTimes{ - User: 100, - System: 101, - Idle: 102, - Nice: 103, - Iowait: 104, - Irq: 105, - Softirq: 106, - Steal: 107, - Guest: 108, - GuestNice: 109, + mockRawHost = resource.Host{ + ID: idgen.HostID("hostname", 8003), + Type: pkgtypes.HostTypeNormal, + Hostname: "hostname", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockRawSeedHost = resource.Host{ + ID: idgen.HostID("hostname_seed", 8003), + Type: pkgtypes.HostTypeSuperSeed, + Hostname: "hostname_seed", + IP: "127.0.0.1", + Port: 8003, + DownloadPort: 8001, + OS: "darwin", + Platform: "darwin", + PlatformFamily: "Standalone Workstation", + PlatformVersion: "11.1", + KernelVersion: "20.2.0", + CPU: mockCPU, + Memory: mockMemory, + Network: mockNetwork, + Disk: mockDisk, + Build: mockBuild, + } + + mockCPU = resource.CPU{ + LogicalCount: 4, + PhysicalCount: 2, + Percent: 1, + ProcessPercent: 0.5, + Times: resource.CPUTimes{ + User: 240662.2, + System: 317950.1, + Idle: 3393691.3, + Nice: 0, + Iowait: 0, + Irq: 0, + Softirq: 0, + Steal: 0, + Guest: 0, + GuestNice: 0, }, } - mockHostMemory = &schedulerv1.Memory{ - Total: 20, - Available: 19, - Used: 16, - UsedPercent: 0.7, - ProcessUsedPercent: 0.2, - Free: 15, + mockMemory = resource.Memory{ + Total: 17179869184, + Available: 5962813440, + Used: 11217055744, + UsedPercent: 65.291858, + ProcessUsedPercent: 41.525125, + Free: 2749598908, } - mockHostNetwork = &schedulerv1.Network{ - TcpConnectionCount: 400, - UploadTcpConnectionCount: 200, - SecurityDomain: "product", - Location: "china", - Idc: "e1", + mockNetwork = resource.Network{ + TCPConnectionCount: 10, + UploadTCPConnectionCount: 1, + SecurityDomain: "security_domain", + Location: "location", + IDC: "idc", } - mockHostDisk = &schedulerv1.Disk{ - Total: 100, - Free: 88, - Used: 56, - UsedPercent: 0.9, - InodesTotal: 200, - InodesUsed: 180, - InodesFree: 160, - InodesUsedPercent: 0.6, + mockDisk = resource.Disk{ + Total: 499963174912, + Free: 37226479616, + Used: 423809622016, + UsedPercent: 91.92547406065952, + InodesTotal: 4882452880, + InodesUsed: 7835772, + InodesFree: 4874617108, + InodesUsedPercent: 0.1604884305611568, } - mockHostBuild = &schedulerv1.Build{ - GitVersion: "3.0.0", - GitCommit: "2bf4d5e", - GoVersion: "1.19", - Platform: "linux", - } - - mockRawHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname", 8003), - Type: pkgtypes.HostTypeNormalName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Cpu: mockHostCPU, - Memory: mockHostMemory, - Network: mockHostNetwork, - Disk: mockHostDisk, - Build: mockHostBuild, - } - - mockRawSeedHost = &schedulerv1.AnnounceHostRequest{ - Id: idgen.HostID("hostname_seed", 8003), - Type: pkgtypes.HostTypeSuperSeedName, - Ip: "127.0.0.1", - Port: 8003, - DownloadPort: 8001, - Hostname: "hostname", - Cpu: mockHostCPU, - Memory: mockHostMemory, - Network: mockHostNetwork, - Disk: mockHostDisk, - Build: mockHostBuild, + mockBuild = resource.Build{ + GitVersion: "v1.0.0", + GitCommit: "221176b117c6d59366d68f2b34d38be50c935883", + GoVersion: "1.18", + Platform: "darwin", } mockPeerHost = &schedulerv1.PeerHost{ @@ -222,7 +232,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -258,7 +268,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -303,7 +313,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -343,7 +353,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -381,7 +391,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -421,7 +431,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -463,7 +473,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -502,7 +512,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -544,7 +554,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -590,7 +600,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -638,7 +648,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -686,7 +696,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -730,7 +740,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -775,7 +785,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -817,7 +827,7 @@ func TestService_RegisterPeerTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, }, mock: func( @@ -864,10 +874,14 @@ func TestService_RegisterPeerTask(t *testing.T) { peerManager := resource.NewMockPeerManager(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) - mockSeedHost := resource.NewHost(mockRawSeedHost) + mockSeedHost := resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost) tc.mock( tc.req, mockPeer, mockSeedPeer, @@ -1124,7 +1138,9 @@ func TestService_ReportPieceResult(t *testing.T) { stream := schedulerv1mocks.NewMockScheduler_ReportPieceResultServer(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.mock(mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), stream.EXPECT()) @@ -1320,7 +1336,9 @@ func TestService_ReportPeerResult(t *testing.T) { peerManager := resource.NewMockPeerManager(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT()) @@ -1408,7 +1426,7 @@ func TestService_AnnounceTask(t *testing.T) { Priority: commonv1.Priority_LEVEL0, }, PeerHost: &schedulerv1.PeerHost{ - Id: mockRawHost.Id, + Id: mockRawHost.ID, }, PiecePacket: &commonv1.PiecePacket{ PieceInfos: []*commonv1.PieceInfo{{PieceNum: 1}}, @@ -1631,7 +1649,9 @@ func TestService_AnnounceTask(t *testing.T) { taskManager := resource.NewMockTaskManager(ctl) peerManager := resource.NewMockPeerManager(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) @@ -1826,7 +1846,9 @@ func TestService_LeaveTask(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) @@ -2039,7 +2061,9 @@ func TestService_LeaveHost(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) hostManager := resource.NewMockHostManager(ctl) - host := resource.NewHost(mockRawHost) + host := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) @@ -2504,8 +2528,12 @@ func TestService_triggerTask(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) svc := NewV1(tc.config, res, scheduler, dynconfig, storage) - mockHost := resource.NewHost(mockRawHost) - mockSeedHost := resource.NewHost(mockRawSeedHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) + mockSeedHost := resource.NewHost( + mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname, + mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost) @@ -2613,12 +2641,12 @@ func TestService_storeHost(t *testing.T) { mock: func(mockHost *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.Id)).Return(mockHost, true).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(mockHost, true).Times(1), ) }, expect: func(t *testing.T, host *resource.Host) { assert := assert.New(t) - assert.Equal(host.ID, mockRawHost.Id) + assert.Equal(host.ID, mockRawHost.ID) }, }, { @@ -2631,7 +2659,7 @@ func TestService_storeHost(t *testing.T) { mock: func(mockHost *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.Id)).Return(nil, false).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false).Times(1), md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1), mr.HostManager().Return(hostManager).Times(1), mh.Store(gomock.Any()).Return().Times(1), @@ -2639,7 +2667,7 @@ func TestService_storeHost(t *testing.T) { }, expect: func(t *testing.T, host *resource.Host) { assert := assert.New(t) - assert.Equal(host.ID, mockRawHost.Id) + assert.Equal(host.ID, mockRawHost.ID) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10)) }, }, @@ -2653,7 +2681,7 @@ func TestService_storeHost(t *testing.T) { mock: func(mockHost *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.HostManager().Return(hostManager).Times(1), - mh.Load(gomock.Eq(mockRawHost.Id)).Return(nil, false).Times(1), + mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false).Times(1), md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1), mr.HostManager().Return(hostManager).Times(1), mh.Store(gomock.Any()).Return().Times(1), @@ -2661,7 +2689,7 @@ func TestService_storeHost(t *testing.T) { }, expect: func(t *testing.T, host *resource.Host) { assert := assert.New(t) - assert.Equal(host.ID, mockRawHost.Id) + assert.Equal(host.ID, mockRawHost.ID) }, }, } @@ -2676,7 +2704,9 @@ func TestService_storeHost(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) hostManager := resource.NewMockHostManager(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) tc.mock(mockHost, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) host := svc.storeHost(context.Background(), tc.req.PeerHost) @@ -2744,7 +2774,9 @@ func TestService_storePeer(t *testing.T) { storage := storagemocks.NewMockStorage(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) peerManager := resource.NewMockPeerManager(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) @@ -2807,7 +2839,9 @@ func TestService_triggerSeedPeerTask(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) seedPeer := resource.NewMockSeedPeer(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, task, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) @@ -2886,7 +2920,9 @@ func TestService_handleBeginOfPiece(t *testing.T) { res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage) @@ -2899,7 +2935,9 @@ func TestService_handleBeginOfPiece(t *testing.T) { } func TestService_handlePieceSuccess(t *testing.T) { - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) now := time.Now() @@ -3157,7 +3195,9 @@ func TestService_handlePieceFail(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) @@ -3278,9 +3318,11 @@ func TestService_handlePeerSuccess(t *testing.T) { t.Fatal(err) } - mockRawHost.Ip = ip + mockRawHost.IP = ip mockRawHost.DownloadPort = int32(port) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) @@ -3359,7 +3401,9 @@ func TestService_handlePeerFail(t *testing.T) { dynconfig := configmocks.NewMockDynconfigInterface(ctl) storage := storagemocks.NewMockStorage(ctl) svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage) - mockHost := resource.NewHost(mockRawHost) + mockHost := resource.NewHost( + mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, + mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) child := resource.NewPeer(mockPeerID, mockTask, mockHost) diff --git a/scheduler/storage/storage_test.go b/scheduler/storage/storage_test.go index 4d6d51d49..ed8b24693 100644 --- a/scheduler/storage/storage_test.go +++ b/scheduler/storage/storage_test.go @@ -31,6 +31,7 @@ import ( "github.com/stretchr/testify/assert" "d7y.io/dragonfly/v2/scheduler/config" + "d7y.io/dragonfly/v2/scheduler/resource" ) var ( @@ -63,12 +64,12 @@ var ( ConcurrentUploadCount: 40, UploadCount: 20, UploadFailedCount: 3, - CPU: CPU{ + CPU: resource.CPU{ LogicalCount: 24, PhysicalCount: 12, Percent: 0.8, ProcessPercent: 0.4, - Times: CPUTimes{ + Times: resource.CPUTimes{ User: 100, System: 101, Idle: 102, @@ -81,7 +82,7 @@ var ( GuestNice: 109, }, }, - Memory: Memory{ + Memory: resource.Memory{ Total: 20, Available: 19, Used: 16, @@ -89,14 +90,14 @@ var ( ProcessUsedPercent: 0.2, Free: 15, }, - Network: Network{ + Network: resource.Network{ TCPConnectionCount: 400, UploadTCPConnectionCount: 200, SecurityDomain: "product", Location: "china", IDC: "e1", }, - Disk: Disk{ + Disk: resource.Disk{ Total: 100, Free: 88, Used: 56, @@ -106,7 +107,7 @@ var ( InodesFree: 160, InodesUsedPercent: 0.6, }, - Build: Build{ + Build: resource.Build{ GitVersion: "3.0.0", GitCommit: "2bf4d5e", GoVersion: "1.19", diff --git a/scheduler/storage/types.go b/scheduler/storage/types.go index 07d806352..6c058c829 100644 --- a/scheduler/storage/types.go +++ b/scheduler/storage/types.go @@ -16,7 +16,11 @@ package storage -import "time" +import ( + "time" + + "d7y.io/dragonfly/v2/scheduler/resource" +) // Task contains content for task. type Task struct { @@ -99,19 +103,19 @@ type Host struct { UploadFailedCount int64 `csv:"uploadFailedCount"` // CPU Stat. - CPU CPU `csv:"cpu"` + CPU resource.CPU `csv:"cpu"` // Memory Stat. - Memory Memory `csv:"memory"` + Memory resource.Memory `csv:"memory"` // Network Stat. - Network Network `csv:"network"` + Network resource.Network `csv:"network"` // Disk Stat. - Disk Disk `csv:"disk"` + Disk resource.Disk `csv:"disk"` // Build information. - Build Build `csv:"build"` + Build resource.Build `csv:"build"` // CreatedAt is peer create nanosecond time. CreatedAt int64 `csv:"createdAt"` @@ -120,138 +124,6 @@ type Host struct { UpdatedAt int64 `csv:"updatedAt"` } -// CPU contains content for cpu. -type CPU struct { - // Number of logical cores in the system. - LogicalCount uint32 `csv:"logicalCount"` - - // Number of physical cores in the system. - PhysicalCount uint32 `csv:"physicalCount"` - - // Percent calculates the percentage of cpu used. - Percent float64 `csv:"percent"` - - // Calculates the percentage of cpu used by process. - ProcessPercent float64 `csv:"processPercent"` - - // Times contains the amounts of time the CPU has spent performing different kinds of work. - Times CPUTimes `csv:"times"` -} - -// CPUTimes contains content for cpu times. -type CPUTimes struct { - // CPU time of user. - User float64 `csv:"user"` - - // CPU time of system. - System float64 `csv:"system"` - - // CPU time of idle. - Idle float64 `csv:"idle"` - - // CPU time of nice. - Nice float64 `csv:"nice"` - - // CPU time of iowait. - Iowait float64 `csv:"iowait"` - - // CPU time of irq. - Irq float64 `csv:"irq"` - - // CPU time of softirq. - Softirq float64 `csv:"softirq"` - - // CPU time of steal. - Steal float64 `csv:"steal"` - - // CPU time of guest. - Guest float64 `csv:"guest"` - - // CPU time of guest nice. - GuestNice float64 `csv:"guestNice"` -} - -// Memory contains content for memory. -type Memory struct { - // Total amount of RAM on this system. - Total uint64 `csv:"total"` - - // RAM available for programs to allocate. - Available uint64 `csv:"available"` - - // RAM used by programs. - Used uint64 `csv:"used"` - - // Percentage of RAM used by programs. - UsedPercent float64 `csv:"usedPercent"` - - // Calculates the percentage of memory used by process. - ProcessUsedPercent float64 `csv:"processUsedPercent"` - - // This is the kernel's notion of free memory. - Free uint64 `csv:"free"` -} - -// Network contains content for network. -type Network struct { - // Return count of tcp connections opened and status is ESTABLISHED. - TCPConnectionCount uint32 `csv:"tcpConnectionCount"` - - // Return count of upload tcp connections opened and status is ESTABLISHED. - UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"` - - // Security domain for network. - SecurityDomain string `csv:"securityDomain"` - - // Location path(area|country|province|city|...). - Location string `csv:"location"` - - // IDC where the peer host is located - IDC string `csv:"idc"` -} - -// Build contains content for build. -type Build struct { - // Git version. - GitVersion string `csv:"gitVersion"` - - // Git commit. - GitCommit string `csv:"gitCommit"` - - // Golang version. - GoVersion string `csv:"goVersion"` - - // Build platform. - Platform string `csv:"platform"` -} - -// Disk contains content for disk. -type Disk struct { - // Total amount of disk on the data path of dragonfly. - Total uint64 `csv:"total"` - - // Free amount of disk on the data path of dragonfly. - Free uint64 `csv:"free"` - - // Used amount of disk on the data path of dragonfly. - Used uint64 `csv:"used"` - - // Used percent of disk on the data path of dragonfly directory. - UsedPercent float64 `csv:"usedPercent"` - - // Total amount of indoes on the data path of dragonfly directory. - InodesTotal uint64 `csv:"inodesTotal"` - - // Used amount of indoes on the data path of dragonfly directory. - InodesUsed uint64 `csv:"inodesUsed"` - - // Free amount of indoes on the data path of dragonfly directory. - InodesFree uint64 `csv:"inodesFree"` - - // Used percent of indoes on the data path of dragonfly directory. - InodesUsedPercent float64 `csv:"inodesUsedPercent"` -} - // Parent contains content for parent. type Parent struct { // ID is peer id.