refactor: resource host without scheduler v1 definition (#2036)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2023-02-06 11:55:07 +08:00
parent d75e94ec04
commit 2b9aa7bf83
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
16 changed files with 1434 additions and 677 deletions

View File

@ -23,8 +23,6 @@ import (
"go.uber.org/atomic"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
@ -41,6 +39,87 @@ func WithConcurrentUploadLimit(limit int32) HostOption {
}
}
// WithOS sets host's os.
func WithOS(os string) HostOption {
return func(h *Host) *Host {
h.OS = os
return h
}
}
// WithPlatform sets host's platform.
func WithPlatform(platform string) HostOption {
return func(h *Host) *Host {
h.Platform = platform
return h
}
}
// WithPlatformFamily sets host's platform family.
func WithPlatformFamily(platformFamily string) HostOption {
return func(h *Host) *Host {
h.PlatformFamily = platformFamily
return h
}
}
// WithPlatformVersion sets host's platform version.
func WithPlatformVersion(platformVersion string) HostOption {
return func(h *Host) *Host {
h.PlatformVersion = platformVersion
return h
}
}
// WithKernelVersion sets host's kernel version.
func WithKernelVersion(kernelVersion string) HostOption {
return func(h *Host) *Host {
h.KernelVersion = kernelVersion
return h
}
}
// WithCPU sets host's cpu.
func WithCPU(cpu CPU) HostOption {
return func(h *Host) *Host {
h.CPU = cpu
return h
}
}
// WithMemory sets host's memory.
func WithMemory(memory Memory) HostOption {
return func(h *Host) *Host {
h.Memory = memory
return h
}
}
// WithNetwork sets host's network.
func WithNetwork(network Network) HostOption {
return func(h *Host) *Host {
h.Network = network
return h
}
}
// WithDisk sets host's disk.
func WithDisk(disk Disk) HostOption {
return func(h *Host) *Host {
h.Disk = disk
return h
}
}
// WithBuild sets host's build information.
func WithBuild(build Build) HostOption {
return func(h *Host) *Host {
h.Build = build
return h
}
}
// Host contains content for host.
type Host struct {
// ID is host id.
ID string
@ -76,19 +155,19 @@ type Host struct {
KernelVersion string
// CPU Stat.
CPU *schedulerv1.CPU
CPU CPU
// Memory Stat.
Memory *schedulerv1.Memory
Memory Memory
// Network Stat.
Network *schedulerv1.Network
Network Network
// Dist Stat.
Disk *schedulerv1.Disk
Disk Disk
// Build information.
Build *schedulerv1.Build
Build Build
// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit *atomic.Int32
@ -118,26 +197,158 @@ type Host struct {
Log *logger.SugaredLoggerOnWith
}
// CPU contains content for cpu.
type CPU struct {
// Number of logical cores in the system.
LogicalCount uint32 `csv:"logicalCount"`
// Number of physical cores in the system.
PhysicalCount uint32 `csv:"physicalCount"`
// Percent calculates the percentage of cpu used.
Percent float64 `csv:"percent"`
// Calculates the percentage of cpu used by process.
ProcessPercent float64 `csv:"processPercent"`
// Times contains the amounts of time the CPU has spent performing different kinds of work.
Times CPUTimes `csv:"times"`
}
// CPUTimes contains content for cpu times.
type CPUTimes struct {
// CPU time of user.
User float64 `csv:"user"`
// CPU time of system.
System float64 `csv:"system"`
// CPU time of idle.
Idle float64 `csv:"idle"`
// CPU time of nice.
Nice float64 `csv:"nice"`
// CPU time of iowait.
Iowait float64 `csv:"iowait"`
// CPU time of irq.
Irq float64 `csv:"irq"`
// CPU time of softirq.
Softirq float64 `csv:"softirq"`
// CPU time of steal.
Steal float64 `csv:"steal"`
// CPU time of guest.
Guest float64 `csv:"guest"`
// CPU time of guest nice.
GuestNice float64 `csv:"guestNice"`
}
// Memory contains content for memory.
type Memory struct {
// Total amount of RAM on this system.
Total uint64 `csv:"total"`
// RAM available for programs to allocate.
Available uint64 `csv:"available"`
// RAM used by programs.
Used uint64 `csv:"used"`
// Percentage of RAM used by programs.
UsedPercent float64 `csv:"usedPercent"`
// Calculates the percentage of memory used by process.
ProcessUsedPercent float64 `csv:"processUsedPercent"`
// This is the kernel's notion of free memory.
Free uint64 `csv:"free"`
}
// Network contains content for network.
type Network struct {
// Return count of tcp connections opened and status is ESTABLISHED.
TCPConnectionCount uint32 `csv:"tcpConnectionCount"`
// Return count of upload tcp connections opened and status is ESTABLISHED.
UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"`
// Security domain for network.
SecurityDomain string `csv:"securityDomain"`
// Location path(area|country|province|city|...).
Location string `csv:"location"`
// IDC where the peer host is located
IDC string `csv:"idc"`
}
// Build contains content for build.
type Build struct {
// Git version.
GitVersion string `csv:"gitVersion"`
// Git commit.
GitCommit string `csv:"gitCommit"`
// Golang version.
GoVersion string `csv:"goVersion"`
// Build platform.
Platform string `csv:"platform"`
}
// Disk contains content for disk.
type Disk struct {
// Total amount of disk on the data path of dragonfly.
Total uint64 `csv:"total"`
// Free amount of disk on the data path of dragonfly.
Free uint64 `csv:"free"`
// Used amount of disk on the data path of dragonfly.
Used uint64 `csv:"used"`
// Used percent of disk on the data path of dragonfly directory.
UsedPercent float64 `csv:"usedPercent"`
// Total amount of indoes on the data path of dragonfly directory.
InodesTotal uint64 `csv:"inodesTotal"`
// Used amount of indoes on the data path of dragonfly directory.
InodesUsed uint64 `csv:"inodesUsed"`
// Free amount of indoes on the data path of dragonfly directory.
InodesFree uint64 `csv:"inodesFree"`
// Used percent of indoes on the data path of dragonfly directory.
InodesUsedPercent float64 `csv:"inodesUsedPercent"`
}
// New host instance.
func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host {
func NewHost(
id, ip, hostname string,
port, downloadPort int32, hostType types.HostType,
options ...HostOption,
) *Host {
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
if hostType == types.HostTypeNormal {
concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit
}
h := &Host{
ID: req.Id,
Type: types.ParseHostType(req.Type),
IP: req.Ip,
Hostname: req.Hostname,
Port: req.Port,
DownloadPort: req.DownloadPort,
OS: req.Os,
Platform: req.Platform,
PlatformFamily: req.PlatformFamily,
PlatformVersion: req.PlatformVersion,
KernelVersion: req.KernelVersion,
CPU: req.Cpu,
Memory: req.Memory,
Network: req.Network,
Disk: req.Disk,
Build: req.Build,
ConcurrentUploadLimit: atomic.NewInt32(config.DefaultPeerConcurrentUploadLimit),
ID: id,
Type: types.HostType(hostType),
IP: ip,
Hostname: hostname,
Port: port,
DownloadPort: downloadPort,
ConcurrentUploadLimit: atomic.NewInt32(int32(concurrentUploadLimit)),
ConcurrentUploadCount: atomic.NewInt32(0),
UploadCount: atomic.NewInt64(0),
UploadFailedCount: atomic.NewInt64(0),
@ -145,7 +356,7 @@ func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host
PeerCount: atomic.NewInt32(0),
CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithHost(req.Id, req.Hostname, req.Ip),
Log: logger.WithHost(id, hostname, ip),
}
for _, opt := range options {

View File

@ -131,7 +131,9 @@ func TestHostManager_Load(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
@ -184,7 +186,9 @@ func TestHostManager_Store(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
@ -235,7 +239,9 @@ func TestHostManager_LoadOrStore(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
@ -288,7 +294,9 @@ func TestHostManager_Delete(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
hostManager, err := newHostManager(mockHostGCConfig, gc)
if err != nil {
t.Fatal(err)
@ -363,7 +371,9 @@ func TestHostManager_RunGC(t *testing.T) {
},
expect: func(t *testing.T, hostManager HostManager, mockHost *Host, mockPeer *Peer) {
assert := assert.New(t)
mockSeedHost := NewHost(mockRawSeedHost)
mockSeedHost := NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
hostManager.Store(mockSeedHost)
err := hostManager.RunGC()
assert.NoError(err)
@ -382,7 +392,9 @@ func TestHostManager_RunGC(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
hostManager, err := newHostManager(mockHostGCConfig, gc)

View File

@ -22,7 +22,6 @@ import (
"github.com/stretchr/testify/assert"
commonv1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
@ -30,39 +29,103 @@ import (
)
var (
mockRawHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname", 8003),
Type: types.HostTypeNormalName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
mockRawHost = Host{
ID: idgen.HostID("hostname", 8003),
Type: types.HostTypeNormal,
Hostname: "hostname",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockRawSeedHost = Host{
ID: idgen.HostID("hostname_seed", 8003),
Type: types.HostTypeSuperSeed,
Hostname: "hostname_seed",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockCPU = CPU{
LogicalCount: 4,
PhysicalCount: 2,
Percent: 1,
ProcessPercent: 0.5,
Times: CPUTimes{
User: 240662.2,
System: 317950.1,
Idle: 3393691.3,
Nice: 0,
Iowait: 0,
Irq: 0,
Softirq: 0,
Steal: 0,
Guest: 0,
GuestNice: 0,
},
}
mockRawSeedHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname_seed", 8003),
Type: types.HostTypeSuperSeedName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname_seed",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
},
mockMemory = Memory{
Total: 17179869184,
Available: 5962813440,
Used: 11217055744,
UsedPercent: 65.291858,
ProcessUsedPercent: 41.525125,
Free: 2749598908,
}
mockNetwork = Network{
TCPConnectionCount: 10,
UploadTCPConnectionCount: 1,
SecurityDomain: "security_domain",
Location: "location",
IDC: "idc",
}
mockDisk = Disk{
Total: 499963174912,
Free: 37226479616,
Used: 423809622016,
UsedPercent: 91.92547406065952,
InodesTotal: 4882452880,
InodesUsed: 7835772,
InodesFree: 4874617108,
InodesUsedPercent: 0.1604884305611568,
}
mockBuild = Build{
GitVersion: "v1.0.0",
GitCommit: "221176b117c6d59366d68f2b34d38be50c935883",
GoVersion: "1.18",
Platform: "darwin",
}
)
func TestHost_NewHost(t *testing.T) {
tests := []struct {
name string
rawHost *schedulerv1.AnnounceHostRequest
rawHost Host
options []HostOption
expect func(t *testing.T, host *Host)
}{
@ -71,19 +134,20 @@ func TestHost_NewHost(t *testing.T) {
rawHost: mockRawHost,
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.Id)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.IP, mockRawHost.Ip)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.Network.SecurityDomain, mockRawHost.Network.SecurityDomain)
assert.Equal(host.Network.Location, mockRawHost.Network.Location)
assert.Equal(host.Network.Idc, mockRawHost.Network.Idc)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEqual(host.CreatedAt.Load(), 0)
assert.NotEqual(host.UpdatedAt.Load(), 0)
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
@ -92,19 +156,20 @@ func TestHost_NewHost(t *testing.T) {
rawHost: mockRawSeedHost,
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawSeedHost.Id)
assert.Equal(host.Type, types.HostTypeSuperSeed)
assert.Equal(host.IP, mockRawSeedHost.Ip)
assert.Equal(host.ID, mockRawSeedHost.ID)
assert.Equal(host.Type, mockRawSeedHost.Type)
assert.Equal(host.Hostname, mockRawSeedHost.Hostname)
assert.Equal(host.IP, mockRawSeedHost.IP)
assert.Equal(host.Port, mockRawSeedHost.Port)
assert.Equal(host.DownloadPort, mockRawSeedHost.DownloadPort)
assert.Equal(host.Hostname, mockRawSeedHost.Hostname)
assert.Equal(host.Network.SecurityDomain, mockRawSeedHost.Network.SecurityDomain)
assert.Equal(host.Network.Location, mockRawSeedHost.Network.Location)
assert.Equal(host.Network.Idc, mockRawSeedHost.Network.Idc)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultSeedPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEqual(host.CreatedAt.Load(), 0)
assert.NotEqual(host.UpdatedAt.Load(), 0)
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
@ -114,19 +179,260 @@ func TestHost_NewHost(t *testing.T) {
options: []HostOption{WithConcurrentUploadLimit(200)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.Id)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.IP, mockRawHost.Ip)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.Network.SecurityDomain, mockRawHost.Network.SecurityDomain)
assert.Equal(host.Network.Location, mockRawHost.Network.Location)
assert.Equal(host.Network.Idc, mockRawHost.Network.Idc)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEqual(host.CreatedAt.Load(), 0)
assert.NotEqual(host.UpdatedAt.Load(), 0)
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set os",
rawHost: mockRawHost,
options: []HostOption{WithOS("linux")},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.OS, "linux")
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set platform",
rawHost: mockRawHost,
options: []HostOption{WithPlatform("ubuntu")},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.Platform, "ubuntu")
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set platform family",
rawHost: mockRawHost,
options: []HostOption{WithPlatformFamily("debian")},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.PlatformFamily, "debian")
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set platform version",
rawHost: mockRawHost,
options: []HostOption{WithPlatformVersion("22.04")},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.PlatformVersion, "22.04")
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set kernel version",
rawHost: mockRawHost,
options: []HostOption{WithKernelVersion("5.15.0-27-generic")},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.Equal(host.KernelVersion, "5.15.0-27-generic")
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set cpu",
rawHost: mockRawHost,
options: []HostOption{WithCPU(mockCPU)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.CPU, mockCPU)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set memory",
rawHost: mockRawHost,
options: []HostOption{WithMemory(mockMemory)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Memory, mockMemory)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set network",
rawHost: mockRawHost,
options: []HostOption{WithNetwork(mockNetwork)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Network, mockNetwork)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set disk",
rawHost: mockRawHost,
options: []HostOption{WithDisk(mockDisk)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Disk, mockDisk)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
{
name: "new host and set build",
rawHost: mockRawHost,
options: []HostOption{WithBuild(mockBuild)},
expect: func(t *testing.T, host *Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.Type, types.HostTypeNormal)
assert.Equal(host.Hostname, mockRawHost.Hostname)
assert.Equal(host.IP, mockRawHost.IP)
assert.Equal(host.Port, mockRawHost.Port)
assert.Equal(host.DownloadPort, mockRawHost.DownloadPort)
assert.EqualValues(host.Build, mockBuild)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.ConcurrentUploadCount.Load(), int32(0))
assert.Equal(host.UploadCount.Load(), int64(0))
assert.Equal(host.UploadFailedCount.Load(), int64(0))
assert.NotNil(host.Peers)
assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEmpty(host.CreatedAt.Load())
assert.NotEmpty(host.UpdatedAt.Load())
assert.NotNil(host.Log)
},
},
@ -134,7 +440,10 @@ func TestHost_NewHost(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, NewHost(tc.rawHost, tc.options...))
tc.expect(t, NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type,
tc.options...))
})
}
}
@ -142,9 +451,8 @@ func TestHost_NewHost(t *testing.T) {
func TestHost_LoadPeer(t *testing.T) {
tests := []struct {
name string
rawHost *schedulerv1.AnnounceHostRequest
rawHost Host
peerID string
options []HostOption
expect func(t *testing.T, peer *Peer, loaded bool)
}{
{
@ -179,7 +487,9 @@ func TestHost_LoadPeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := NewHost(tc.rawHost, tc.options...)
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, host)
@ -193,7 +503,7 @@ func TestHost_LoadPeer(t *testing.T) {
func TestHost_StorePeer(t *testing.T) {
tests := []struct {
name string
rawHost *schedulerv1.AnnounceHostRequest
rawHost Host
peerID string
options []HostOption
expect func(t *testing.T, peer *Peer, loaded bool)
@ -222,7 +532,9 @@ func TestHost_StorePeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := NewHost(tc.rawHost, tc.options...)
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(tc.peerID, mockTask, host)
@ -236,7 +548,7 @@ func TestHost_StorePeer(t *testing.T) {
func TestHost_DeletePeer(t *testing.T) {
tests := []struct {
name string
rawHost *schedulerv1.AnnounceHostRequest
rawHost Host
peerID string
options []HostOption
expect func(t *testing.T, host *Host)
@ -266,7 +578,9 @@ func TestHost_DeletePeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := NewHost(tc.rawHost, tc.options...)
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, host)
@ -280,7 +594,7 @@ func TestHost_DeletePeer(t *testing.T) {
func TestHost_LeavePeers(t *testing.T) {
tests := []struct {
name string
rawHost *schedulerv1.AnnounceHostRequest
rawHost Host
options []HostOption
expect func(t *testing.T, host *Host, mockPeer *Peer)
}{
@ -316,7 +630,9 @@ func TestHost_LeavePeers(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := NewHost(tc.rawHost, tc.options...)
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, host)
@ -328,7 +644,7 @@ func TestHost_LeavePeers(t *testing.T) {
func TestHost_FreeUploadCount(t *testing.T) {
tests := []struct {
name string
rawHost *schedulerv1.AnnounceHostRequest
rawHost Host
options []HostOption
expect func(t *testing.T, host *Host, mockTask *Task, mockPeer *Peer)
}{
@ -366,7 +682,9 @@ func TestHost_FreeUploadCount(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := NewHost(tc.rawHost, tc.options...)
host := NewHost(
tc.rawHost.ID, tc.rawHost.IP, tc.rawHost.Hostname,
tc.rawHost.Port, tc.rawHost.DownloadPort, tc.rawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, host)

View File

@ -133,7 +133,9 @@ func TestPeerManager_Load(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
@ -188,7 +190,9 @@ func TestPeerManager_Store(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
@ -241,7 +245,9 @@ func TestPeerManager_LoadOrStore(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
@ -296,7 +302,9 @@ func TestPeerManager_Delete(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(mockPeerGCConfig, gc)
@ -503,7 +511,9 @@ func TestPeerManager_RunGC(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
peerManager, err := newPeerManager(tc.gcConfig, gc)

View File

@ -95,7 +95,9 @@ func TestPeer_NewPeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
tc.expect(t, NewPeer(tc.id, mockTask, mockHost, tc.options...), mockTask, mockHost)
})
@ -128,7 +130,9 @@ func TestPeer_AppendPieceCost(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
@ -163,7 +167,9 @@ func TestPeer_PieceCosts(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
@ -203,7 +209,9 @@ func TestPeer_LoadStream(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer, stream)
@ -234,7 +242,9 @@ func TestPeer_StoreStream(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer, stream)
@ -265,7 +275,9 @@ func TestPeer_DeleteStream(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer, stream)
@ -308,7 +320,9 @@ func TestPeer_Parents(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost)
@ -352,7 +366,9 @@ func TestPeer_Children(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost)
@ -449,9 +465,11 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
t.Fatal(err)
}
mockRawHost.Ip = ip
mockRawHost.IP = ip
mockRawHost.DownloadPort = int32(port)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer = NewPeer(mockPeerID, mockTask, mockHost)
tc.expect(t, peer)
@ -565,7 +583,9 @@ func TestPeer_GetPriority(t *testing.T) {
defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(peer, dynconfig.EXPECT())

View File

@ -26,7 +26,6 @@ import (
"google.golang.org/grpc"
managerv1 "d7y.io/api/pkg/apis/manager/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/dfnet"
@ -125,35 +124,29 @@ func (sc *seedPeerClient) updateSeedPeersForHostManager(seedPeers []*managerv1.S
id := idgen.HostID(seedPeer.HostName, seedPeer.Port)
seedPeerHost, loaded := sc.hostManager.Load(id)
if !loaded {
var options []HostOption
options := []HostOption{WithNetwork(Network{
Location: seedPeer.Location,
IDC: seedPeer.Idc,
})}
if concurrentUploadLimit > 0 {
options = append(options, WithConcurrentUploadLimit(concurrentUploadLimit))
}
sc.hostManager.Store(NewHost(&schedulerv1.AnnounceHostRequest{
Id: id,
Type: types.HostTypeSuperSeedName,
Ip: seedPeer.Ip,
Hostname: seedPeer.HostName,
Port: seedPeer.Port,
DownloadPort: seedPeer.DownloadPort,
Network: &schedulerv1.Network{
Location: seedPeer.Location,
Idc: seedPeer.Idc,
},
}, options...))
host := NewHost(
id, seedPeer.Ip, seedPeer.HostName,
seedPeer.Port, seedPeer.DownloadPort, types.HostTypeSuperSeed,
options...,
)
sc.hostManager.Store(host)
continue
}
seedPeerHost.IP = seedPeer.Ip
seedPeerHost.Type = types.HostTypeSuperSeed
seedPeerHost.Hostname = seedPeer.HostName
seedPeerHost.Port = seedPeer.Port
seedPeerHost.IP = seedPeer.Ip
seedPeerHost.DownloadPort = seedPeer.DownloadPort
seedPeerHost.Network = &schedulerv1.Network{
Location: seedPeer.Location,
Idc: seedPeer.Idc,
}
seedPeerHost.Network.Location = seedPeer.Location
seedPeerHost.Network.IDC = seedPeer.Idc
if concurrentUploadLimit > 0 {
seedPeerHost.ConcurrentUploadLimit.Store(concurrentUploadLimit)

View File

@ -163,7 +163,9 @@ func TestSeedPeerClient_OnNotify(t *testing.T) {
},
},
mock: func(dynconfig *configmocks.MockDynconfigInterfaceMockRecorder, hostManager *MockHostManagerMockRecorder) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
gomock.InOrder(
dynconfig.Get().Return(&config.DynconfigData{
Scheduler: &managerv1.Scheduler{
@ -288,17 +290,17 @@ func TestSeedPeerClient_seedPeersToNetAddrs(t *testing.T) {
Id: 1,
Type: pkgtypes.HostTypeSuperSeedName,
HostName: mockRawSeedHost.Hostname,
Ip: mockRawSeedHost.Ip,
Ip: mockRawSeedHost.IP,
Port: mockRawSeedHost.Port,
DownloadPort: mockRawSeedHost.DownloadPort,
Idc: mockRawSeedHost.Network.Idc,
Idc: mockRawSeedHost.Network.IDC,
Location: mockRawSeedHost.Network.Location,
},
},
expect: func(t *testing.T, netAddrs []dfnet.NetAddr) {
assert := assert.New(t)
assert.Equal(netAddrs[0].Type, dfnet.TCP)
assert.Equal(netAddrs[0].Addr, fmt.Sprintf("%s:%d", mockRawSeedHost.Ip, mockRawSeedHost.Port))
assert.Equal(netAddrs[0].Addr, fmt.Sprintf("%s:%d", mockRawSeedHost.IP, mockRawSeedHost.Port))
},
},
{

View File

@ -353,7 +353,9 @@ func TestTaskManager_RunGC(t *testing.T) {
gc := gc.NewMockGC(ctl)
tc.mock(gc.EXPECT())
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, mockTask, mockHost)
taskManager, err := newTaskManager(mockTaskGCConfig, gc)

View File

@ -148,7 +148,9 @@ func TestTask_LoadPeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(tc.id, tc.url, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
@ -215,7 +217,9 @@ func TestTask_LoadRandomPeers(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := NewHost(mockRawHost)
host := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, task, host)
@ -263,7 +267,9 @@ func TestTask_StorePeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(tc.id, tc.url, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit))
mockPeer := NewPeer(tc.peerID, task, mockHost)
@ -315,7 +321,9 @@ func TestTask_DeletePeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(tc.id, tc.url, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
@ -352,7 +360,9 @@ func TestTask_PeerCount(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
mockPeer := NewPeer(mockPeerID, task, mockHost)
@ -448,7 +458,9 @@ func TestTask_AddPeerEdge(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -550,7 +562,9 @@ func TestTask_DeletePeerInEdges(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -650,7 +664,9 @@ func TestTask_DeletePeerOutEdges(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -735,7 +751,9 @@ func TestTask_CanAddPeerEdge(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -796,7 +814,9 @@ func TestTask_PeerDegree(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -857,7 +877,9 @@ func TestTask_PeerInDegree(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -918,7 +940,9 @@ func TestTask_PeerOutDegree(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta)
tc.expect(t, mockHost, task)
@ -1026,7 +1050,9 @@ func TestTask_HasAvailablePeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, tc.urlMeta, WithBackToSourceLimit(tc.backToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
@ -1088,8 +1114,12 @@ func TestTask_LoadSeedPeer(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockSeedHost := NewHost(mockRawSeedHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockSeedHost := NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
mockSeedPeer := NewPeer(mockSeedPeerID, task, mockSeedHost)
@ -1151,8 +1181,12 @@ func TestTask_IsSeedPeerFailed(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := NewHost(mockRawHost)
mockSeedHost := NewHost(mockRawSeedHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockSeedHost := NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
mockSeedPeer := NewPeer(mockSeedPeerID, task, mockSeedHost)
@ -1622,7 +1656,9 @@ func TestTask_NotifyPeers(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockHost := NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := NewPeer(mockPeerID, task, mockHost)
task.StorePeer(mockPeer)

View File

@ -77,31 +77,18 @@ func NewEvaluatorBase() Evaluator {
// The larger the value after evaluation, the higher the priority.
func (eb *evaluatorBase) Evaluate(parent *resource.Peer, child *resource.Peer, totalPieceCount int32) float64 {
var (
parentSecurityDomain string
childSecurityDomain string
parentLocation string
childLocation string
parentIDC string
childIDC string
)
if parent.Host.Network != nil {
parentSecurityDomain = parent.Host.Network.SecurityDomain
parentLocation = parent.Host.Network.Location
parentIDC = parent.Host.Network.Idc
}
if child.Host.Network != nil {
childSecurityDomain = child.Host.Network.SecurityDomain
childLocation = child.Host.Network.Location
childIDC = child.Host.Network.Idc
}
parentSecurityDomain := parent.Host.Network.SecurityDomain
parentLocation := parent.Host.Network.Location
parentIDC := parent.Host.Network.IDC
childSecurityDomain := child.Host.Network.SecurityDomain
childLocation := child.Host.Network.Location
childIDC := child.Host.Network.IDC
// If the SecurityDomain of hosts exists but is not equal,
// it cannot be scheduled as a parent.
if parentSecurityDomain != "" &&
childSecurityDomain != "" &&
parent.Host.Network.SecurityDomain != child.Host.Network.SecurityDomain {
parentSecurityDomain != childSecurityDomain {
return minScore
}

View File

@ -23,7 +23,6 @@ import (
"github.com/stretchr/testify/assert"
commonv1 "d7y.io/api/pkg/apis/common/v1"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/types"
@ -31,32 +30,96 @@ import (
)
var (
mockRawSeedHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname", 8003),
Type: types.HostTypeSuperSeedName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
mockRawHost = resource.Host{
ID: idgen.HostID("hostname", 8003),
Type: types.HostTypeNormal,
Hostname: "hostname",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockRawSeedHost = resource.Host{
ID: idgen.HostID("hostname_seed", 8003),
Type: types.HostTypeSuperSeed,
Hostname: "hostname_seed",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockCPU = resource.CPU{
LogicalCount: 4,
PhysicalCount: 2,
Percent: 1,
ProcessPercent: 0.5,
Times: resource.CPUTimes{
User: 240662.2,
System: 317950.1,
Idle: 3393691.3,
Nice: 0,
Iowait: 0,
Irq: 0,
Softirq: 0,
Steal: 0,
Guest: 0,
GuestNice: 0,
},
}
mockRawHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname", 8003),
Type: types.HostTypeNormalName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
},
mockMemory = resource.Memory{
Total: 17179869184,
Available: 5962813440,
Used: 11217055744,
UsedPercent: 65.291858,
ProcessUsedPercent: 41.525125,
Free: 2749598908,
}
mockNetwork = resource.Network{
TCPConnectionCount: 10,
UploadTCPConnectionCount: 1,
SecurityDomain: "security_domain",
Location: "location",
IDC: "idc",
}
mockDisk = resource.Disk{
Total: 499963174912,
Free: 37226479616,
Used: 423809622016,
UsedPercent: 91.92547406065952,
InodesTotal: 4882452880,
InodesUsed: 7835772,
InodesFree: 4874617108,
InodesUsedPercent: 0.1604884305611568,
}
mockBuild = resource.Build{
GitVersion: "v1.0.0",
GitCommit: "221176b117c6d59366d68f2b34d38be50c935883",
GoVersion: "1.18",
Platform: "darwin",
}
mockTaskURLMeta = &commonv1.UrlMeta{
@ -109,10 +172,14 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
name: "security domain is not the same",
parent: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawSeedHost)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawHost)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parent *resource.Peer, child *resource.Peer) {
parent.Host.Network.SecurityDomain = "foo"
@ -127,10 +194,14 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
name: "security domain is same",
parent: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawSeedHost)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawHost)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parent *resource.Peer, child *resource.Peer) {
parent.Host.Network.SecurityDomain = "bac"
@ -139,17 +210,21 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(0.8500000000000001))
assert.Equal(score, float64(0.55))
},
},
{
name: "parent security domain is empty",
parent: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawSeedHost)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawHost)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parent *resource.Peer, child *resource.Peer) {
parent.Host.Network.SecurityDomain = ""
@ -158,17 +233,21 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(0.8500000000000001))
assert.Equal(score, float64(0.55))
},
},
{
name: "child security domain is empty",
parent: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawSeedHost)),
resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)),
child: resource.NewPeer(idgen.PeerID("127.0.0.1"),
resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit)),
resource.NewHost(mockRawHost)),
resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)),
totalPieceCount: 1,
mock: func(parent *resource.Peer, child *resource.Peer) {
parent.Host.Network.SecurityDomain = "baz"
@ -177,7 +256,7 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
assert.Equal(score, float64(0.8500000000000001))
assert.Equal(score, float64(0.55))
},
},
}
@ -192,7 +271,9 @@ func TestEvaluatorBase_Evaluate(t *testing.T) {
}
func TestEvaluatorBase_calculatePieceScore(t *testing.T) {
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
tests := []struct {
@ -351,7 +432,9 @@ func TestEvaluatorBase_calculatehostUploadSuccessScore(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := resource.NewHost(mockRawHost)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, host)
tc.mock(host)
@ -388,7 +471,9 @@ func TestEvaluatorBase_calculateFreeUploadScore(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
host := resource.NewHost(mockRawHost)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, host)
tc.mock(host, mockPeer)
@ -437,7 +522,9 @@ func TestEvaluatorBase_calculateHostTypeScore(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(peer)
@ -455,8 +542,8 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
{
name: "idc is empty",
mock: func(dstHost *resource.Host, srcHost *resource.Host) {
dstHost.Network.Idc = ""
srcHost.Network.Idc = ""
dstHost.Network.IDC = ""
srcHost.Network.IDC = ""
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
@ -466,7 +553,7 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
{
name: "dst host idc is empty",
mock: func(dstHost *resource.Host, srcHost *resource.Host) {
dstHost.Network.Idc = ""
dstHost.Network.IDC = ""
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
@ -476,7 +563,7 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
{
name: "src host idc is empty",
mock: func(dstHost *resource.Host, srcHost *resource.Host) {
srcHost.Network.Idc = ""
srcHost.Network.IDC = ""
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
@ -486,8 +573,8 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
{
name: "idc is not the same",
mock: func(dstHost *resource.Host, srcHost *resource.Host) {
dstHost.Network.Idc = "foo"
srcHost.Network.Idc = "bar"
dstHost.Network.IDC = "foo"
srcHost.Network.IDC = "bar"
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
@ -497,8 +584,8 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
{
name: "idc is the same",
mock: func(dstHost *resource.Host, srcHost *resource.Host) {
dstHost.Network.Idc = "example"
srcHost.Network.Idc = "example"
dstHost.Network.IDC = "example"
srcHost.Network.IDC = "example"
},
expect: func(t *testing.T, score float64) {
assert := assert.New(t)
@ -509,10 +596,14 @@ func TestEvaluatorBase_calculateIDCAffinityScore(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
dstHost := resource.NewHost(mockRawHost)
srcHost := resource.NewHost(mockRawSeedHost)
dstHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
srcHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
tc.mock(dstHost, srcHost)
tc.expect(t, calculateIDCAffinityScore(dstHost.Network.Idc, srcHost.Network.Idc))
tc.expect(t, calculateIDCAffinityScore(dstHost.Network.IDC, srcHost.Network.IDC))
})
}
}
@ -642,7 +733,9 @@ func TestEvaluatorBase_calculateMultiElementAffinityScore(t *testing.T) {
}
func TestEvaluatorBase_IsBadNode(t *testing.T) {
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
tests := []struct {

View File

@ -52,32 +52,96 @@ var (
Algorithm: evaluator.DefaultAlgorithm,
}
mockRawHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname", 8003),
Type: pkgtypes.HostTypeNormalName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
mockRawHost = resource.Host{
ID: idgen.HostID("hostname", 8003),
Type: pkgtypes.HostTypeNormal,
Hostname: "hostname",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockRawSeedHost = resource.Host{
ID: idgen.HostID("hostname_seed", 8003),
Type: pkgtypes.HostTypeSuperSeed,
Hostname: "hostname_seed",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockCPU = resource.CPU{
LogicalCount: 4,
PhysicalCount: 2,
Percent: 1,
ProcessPercent: 0.5,
Times: resource.CPUTimes{
User: 240662.2,
System: 317950.1,
Idle: 3393691.3,
Nice: 0,
Iowait: 0,
Irq: 0,
Softirq: 0,
Steal: 0,
Guest: 0,
GuestNice: 0,
},
}
mockRawSeedHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname_seed", 8003),
Type: pkgtypes.HostTypeSuperSeedName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
},
mockMemory = resource.Memory{
Total: 17179869184,
Available: 5962813440,
Used: 11217055744,
UsedPercent: 65.291858,
ProcessUsedPercent: 41.525125,
Free: 2749598908,
}
mockNetwork = resource.Network{
TCPConnectionCount: 10,
UploadTCPConnectionCount: 1,
SecurityDomain: "security_domain",
Location: "location",
IDC: "idc",
}
mockDisk = resource.Disk{
Total: 499963174912,
Free: 37226479616,
Used: 423809622016,
UsedPercent: 91.92547406065952,
InodesTotal: 4882452880,
InodesUsed: 7835772,
InodesFree: 4874617108,
InodesUsedPercent: 0.1604884305611568,
}
mockBuild = resource.Build{
GitVersion: "v1.0.0",
GitCommit: "221176b117c6d59366d68f2b34d38be50c935883",
GoVersion: "1.18",
Platform: "darwin",
}
mockTaskURLMeta = &commonv1.UrlMeta{
@ -316,10 +380,14 @@ func TestScheduler_ScheduleParent(t *testing.T) {
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
ctx, cancel := context.WithCancel(context.Background())
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockSeedHost := resource.NewHost(mockRawSeedHost)
mockSeedHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
seedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost)
blocklist := set.NewSafeSet[string]()
@ -556,18 +624,9 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
mock: func(peer *resource.Peer, mockTask *resource.Task, mockPeer *resource.Peer, blocklist set.SafeSet[string], stream schedulerv1.Scheduler_ReportPieceResultServer, dynconfig config.DynconfigInterface, ms *mocks.MockScheduler_ReportPieceResultServerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
peer.FSM.SetState(resource.PeerStateRunning)
mockPeer.FSM.SetState(resource.PeerStateRunning)
candidatePeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(&schedulerv1.AnnounceHostRequest{
Id: idgen.HostID(uuid.New().String(), 8003),
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
},
}))
candidatePeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(
idgen.HostID(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type))
candidatePeer.FSM.SetState(resource.PeerStateRunning)
peer.Task.StorePeer(peer)
peer.Task.StorePeer(mockPeer)
@ -600,21 +659,14 @@ func TestScheduler_NotifyAndFindParent(t *testing.T) {
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(&schedulerv1.AnnounceHostRequest{
Id: idgen.HostID(uuid.New().String(), 8003),
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
},
}))
mockPeer := resource.NewPeer(idgen.PeerID("127.0.0.1"), mockTask, resource.NewHost(
idgen.HostID(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type))
blocklist := set.NewSafeSet[string]()
tc.mock(peer, mockTask, mockPeer, blocklist, stream, dynconfig, stream.EXPECT(), dynconfig.EXPECT())
@ -878,24 +930,17 @@ func TestScheduler_FindParent(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
var mockPeers []*resource.Peer
for i := 0; i < 11; i++ {
mockHost := resource.NewHost(&schedulerv1.AnnounceHostRequest{
Id: idgen.HostID(uuid.New().String(), 8003),
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Network: &schedulerv1.Network{
SecurityDomain: "security_domain",
Location: "location",
Idc: "idc",
},
})
mockHost := resource.NewHost(
idgen.HostID(uuid.New().String(), 8003), mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
peer := resource.NewPeer(idgen.PeerID(fmt.Sprintf("127.0.0.%d", i)), mockTask, mockHost)
mockPeers = append(mockPeers, peer)
}
@ -978,7 +1023,9 @@ func TestScheduler_constructSuccessPeerPacket(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)

View File

@ -440,32 +440,164 @@ func (v *V1) AnnounceHost(ctx context.Context, req *schedulerv1.AnnounceHostRequ
host, loaded := v.resource.HostManager().Load(req.Id)
if !loaded {
var options []resource.HostOption
options := []resource.HostOption{
resource.WithOS(req.Os),
resource.WithPlatform(req.Platform),
resource.WithPlatformFamily(req.PlatformFamily),
resource.WithPlatformVersion(req.PlatformVersion),
resource.WithKernelVersion(req.KernelVersion),
}
if concurrentUploadLimit > 0 {
options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit))
}
host = resource.NewHost(req, options...)
if req.Cpu != nil {
options = append(options, resource.WithCPU(resource.CPU{
LogicalCount: req.Cpu.LogicalCount,
PhysicalCount: req.Cpu.PhysicalCount,
Percent: req.Cpu.Percent,
ProcessPercent: req.Cpu.ProcessPercent,
Times: resource.CPUTimes{
User: req.Cpu.Times.User,
System: req.Cpu.Times.System,
Idle: req.Cpu.Times.Idle,
Nice: req.Cpu.Times.Nice,
Iowait: req.Cpu.Times.Iowait,
Irq: req.Cpu.Times.Irq,
Softirq: req.Cpu.Times.Softirq,
Steal: req.Cpu.Times.Steal,
Guest: req.Cpu.Times.Guest,
GuestNice: req.Cpu.Times.GuestNice,
},
}))
}
if req.Memory != nil {
options = append(options, resource.WithMemory(resource.Memory{
Total: req.Memory.Total,
Available: req.Memory.Available,
Used: req.Memory.Used,
UsedPercent: req.Memory.UsedPercent,
ProcessUsedPercent: req.Memory.ProcessUsedPercent,
Free: req.Memory.Free,
}))
}
if req.Network != nil {
options = append(options, resource.WithNetwork(resource.Network{
TCPConnectionCount: req.Network.TcpConnectionCount,
UploadTCPConnectionCount: req.Network.UploadTcpConnectionCount,
SecurityDomain: req.Network.SecurityDomain,
Location: req.Network.Location,
IDC: req.Network.Idc,
}))
}
if req.Disk != nil {
options = append(options, resource.WithDisk(resource.Disk{
Total: req.Disk.Total,
Free: req.Disk.Free,
Used: req.Disk.Used,
UsedPercent: req.Disk.UsedPercent,
InodesTotal: req.Disk.InodesTotal,
InodesUsed: req.Disk.InodesUsed,
InodesFree: req.Disk.InodesFree,
InodesUsedPercent: req.Disk.InodesUsedPercent,
}))
}
if req.Build != nil {
options = append(options, resource.WithBuild(resource.Build{
GitVersion: req.Build.GitVersion,
GitCommit: req.Build.GitCommit,
GoVersion: req.Build.GoVersion,
Platform: req.Build.Platform,
}))
}
host = resource.NewHost(
req.Id, req.Ip, req.Hostname,
req.Port, req.DownloadPort, types.ParseHostType(req.Type),
options...,
)
v.resource.HostManager().Store(host)
host.Log.Infof("announce new host: %#v", req)
return nil
}
host.Type = types.ParseHostType(req.Type)
host.IP = req.Ip
host.Hostname = req.Hostname
host.Port = req.Port
host.DownloadPort = req.DownloadPort
host.Type = types.ParseHostType(req.Type)
host.OS = req.Os
host.Platform = req.Platform
host.PlatformFamily = req.PlatformFamily
host.PlatformVersion = req.PlatformVersion
host.KernelVersion = req.KernelVersion
host.CPU = req.Cpu
host.Memory = req.Memory
host.Network = req.Network
host.Disk = req.Disk
host.Build = req.Build
if req.Cpu != nil {
host.CPU = resource.CPU{
LogicalCount: req.Cpu.LogicalCount,
PhysicalCount: req.Cpu.PhysicalCount,
Percent: req.Cpu.Percent,
ProcessPercent: req.Cpu.ProcessPercent,
Times: resource.CPUTimes{
User: req.Cpu.Times.User,
System: req.Cpu.Times.System,
Idle: req.Cpu.Times.Idle,
Nice: req.Cpu.Times.Nice,
Iowait: req.Cpu.Times.Iowait,
Irq: req.Cpu.Times.Irq,
Softirq: req.Cpu.Times.Softirq,
Steal: req.Cpu.Times.Steal,
Guest: req.Cpu.Times.Guest,
GuestNice: req.Cpu.Times.GuestNice,
},
}
}
if req.Memory != nil {
host.Memory = resource.Memory{
Total: req.Memory.Total,
Available: req.Memory.Available,
Used: req.Memory.Used,
UsedPercent: req.Memory.UsedPercent,
ProcessUsedPercent: req.Memory.ProcessUsedPercent,
Free: req.Memory.Free,
}
}
if req.Network != nil {
host.Network = resource.Network{
TCPConnectionCount: req.Network.TcpConnectionCount,
UploadTCPConnectionCount: req.Network.UploadTcpConnectionCount,
SecurityDomain: req.Network.SecurityDomain,
Location: req.Network.Location,
IDC: req.Network.Idc,
}
}
if req.Disk != nil {
host.Disk = resource.Disk{
Total: req.Disk.Total,
Free: req.Disk.Free,
Used: req.Disk.Used,
UsedPercent: req.Disk.UsedPercent,
InodesTotal: req.Disk.InodesTotal,
InodesUsed: req.Disk.InodesUsed,
InodesFree: req.Disk.InodesFree,
InodesUsedPercent: req.Disk.InodesUsedPercent,
}
}
if req.Build != nil {
host.Build = resource.Build{
GitVersion: req.Build.GitVersion,
GitCommit: req.Build.GitCommit,
GoVersion: req.Build.GoVersion,
Platform: req.Build.Platform,
}
}
if concurrentUploadLimit > 0 {
host.ConcurrentUploadLimit.Store(concurrentUploadLimit)
@ -597,24 +729,21 @@ func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *res
host, loaded := v.resource.HostManager().Load(peerHost.Id)
if !loaded {
// Get scheduler cluster client config by manager.
var options []resource.HostOption
options := []resource.HostOption{resource.WithNetwork(resource.Network{
SecurityDomain: peerHost.SecurityDomain,
Location: peerHost.Location,
IDC: peerHost.Idc,
})}
if clientConfig, err := v.dynconfig.GetSchedulerClusterClientConfig(); err == nil && clientConfig.LoadLimit > 0 {
options = append(options, resource.WithConcurrentUploadLimit(int32(clientConfig.LoadLimit)))
}
host = resource.NewHost(&schedulerv1.AnnounceHostRequest{
Id: peerHost.Id,
Type: types.HostTypeNormalName,
Ip: peerHost.Ip,
Hostname: peerHost.HostName,
Port: peerHost.RpcPort,
DownloadPort: peerHost.DownPort,
Network: &schedulerv1.Network{
SecurityDomain: peerHost.SecurityDomain,
Location: peerHost.Location,
Idc: peerHost.Idc,
},
}, options...)
host = resource.NewHost(
peerHost.Id, peerHost.Ip, peerHost.HostName,
peerHost.RpcPort, peerHost.DownPort, types.HostTypeNormal,
options...,
)
v.resource.HostManager().Store(host)
host.Log.Info("create new host")
return host
@ -1096,68 +1225,58 @@ func (v *V1) createRecord(peer *resource.Peer, parents []*resource.Peer, req *sc
},
}
if parent.Host.CPU != nil {
parentRecord.Host.CPU = storage.CPU{
LogicalCount: parent.Host.CPU.LogicalCount,
PhysicalCount: parent.Host.CPU.PhysicalCount,
Percent: parent.Host.CPU.Percent,
ProcessPercent: parent.Host.CPU.ProcessPercent,
Times: storage.CPUTimes{
User: parent.Host.CPU.Times.User,
System: parent.Host.CPU.Times.System,
Idle: parent.Host.CPU.Times.Idle,
Nice: parent.Host.CPU.Times.Nice,
Iowait: parent.Host.CPU.Times.Iowait,
Irq: parent.Host.CPU.Times.Irq,
Softirq: parent.Host.CPU.Times.Softirq,
Steal: parent.Host.CPU.Times.Steal,
Guest: parent.Host.CPU.Times.Guest,
GuestNice: parent.Host.CPU.Times.GuestNice,
},
}
parentRecord.Host.CPU = resource.CPU{
LogicalCount: parent.Host.CPU.LogicalCount,
PhysicalCount: parent.Host.CPU.PhysicalCount,
Percent: parent.Host.CPU.Percent,
ProcessPercent: parent.Host.CPU.ProcessPercent,
Times: resource.CPUTimes{
User: parent.Host.CPU.Times.User,
System: parent.Host.CPU.Times.System,
Idle: parent.Host.CPU.Times.Idle,
Nice: parent.Host.CPU.Times.Nice,
Iowait: parent.Host.CPU.Times.Iowait,
Irq: parent.Host.CPU.Times.Irq,
Softirq: parent.Host.CPU.Times.Softirq,
Steal: parent.Host.CPU.Times.Steal,
Guest: parent.Host.CPU.Times.Guest,
GuestNice: parent.Host.CPU.Times.GuestNice,
},
}
if parent.Host.Memory != nil {
parentRecord.Host.Memory = storage.Memory{
Total: parent.Host.Memory.Total,
Available: parent.Host.Memory.Available,
Used: parent.Host.Memory.Used,
UsedPercent: parent.Host.Memory.UsedPercent,
ProcessUsedPercent: parent.Host.Memory.ProcessUsedPercent,
Free: parent.Host.Memory.Free,
}
parentRecord.Host.Memory = resource.Memory{
Total: parent.Host.Memory.Total,
Available: parent.Host.Memory.Available,
Used: parent.Host.Memory.Used,
UsedPercent: parent.Host.Memory.UsedPercent,
ProcessUsedPercent: parent.Host.Memory.ProcessUsedPercent,
Free: parent.Host.Memory.Free,
}
if parent.Host.Network != nil {
parentRecord.Host.Network = storage.Network{
TCPConnectionCount: parent.Host.Network.TcpConnectionCount,
UploadTCPConnectionCount: parent.Host.Network.UploadTcpConnectionCount,
SecurityDomain: parent.Host.Network.SecurityDomain,
Location: parent.Host.Network.Location,
IDC: parent.Host.Network.Idc,
}
parentRecord.Host.Network = resource.Network{
TCPConnectionCount: parent.Host.Network.TCPConnectionCount,
UploadTCPConnectionCount: parent.Host.Network.UploadTCPConnectionCount,
SecurityDomain: parent.Host.Network.SecurityDomain,
Location: parent.Host.Network.Location,
IDC: parent.Host.Network.IDC,
}
if parent.Host.Disk != nil {
parentRecord.Host.Disk = storage.Disk{
Total: parent.Host.Disk.Total,
Free: parent.Host.Disk.Free,
Used: parent.Host.Disk.Used,
UsedPercent: parent.Host.Disk.UsedPercent,
InodesTotal: parent.Host.Disk.InodesTotal,
InodesUsed: parent.Host.Disk.InodesUsed,
InodesFree: parent.Host.Disk.InodesFree,
InodesUsedPercent: parent.Host.Disk.InodesUsedPercent,
}
parentRecord.Host.Disk = resource.Disk{
Total: parent.Host.Disk.Total,
Free: parent.Host.Disk.Free,
Used: parent.Host.Disk.Used,
UsedPercent: parent.Host.Disk.UsedPercent,
InodesTotal: parent.Host.Disk.InodesTotal,
InodesUsed: parent.Host.Disk.InodesUsed,
InodesFree: parent.Host.Disk.InodesFree,
InodesUsedPercent: parent.Host.Disk.InodesUsedPercent,
}
if parent.Host.Build != nil {
parentRecord.Host.Build = storage.Build{
GitVersion: parent.Host.Build.GitVersion,
GitCommit: parent.Host.Build.GitCommit,
GoVersion: parent.Host.Build.GoVersion,
Platform: parent.Host.Build.Platform,
}
parentRecord.Host.Build = resource.Build{
GitVersion: parent.Host.Build.GitVersion,
GitCommit: parent.Host.Build.GitCommit,
GoVersion: parent.Host.Build.GoVersion,
Platform: parent.Host.Build.Platform,
}
for _, piece := range peer.Pieces.Values() {
@ -1211,68 +1330,58 @@ func (v *V1) createRecord(peer *resource.Peer, parents []*resource.Peer, req *sc
},
}
if peer.Host.CPU != nil {
record.Host.CPU = storage.CPU{
LogicalCount: peer.Host.CPU.LogicalCount,
PhysicalCount: peer.Host.CPU.PhysicalCount,
Percent: peer.Host.CPU.Percent,
ProcessPercent: peer.Host.CPU.ProcessPercent,
Times: storage.CPUTimes{
User: peer.Host.CPU.Times.User,
System: peer.Host.CPU.Times.System,
Idle: peer.Host.CPU.Times.Idle,
Nice: peer.Host.CPU.Times.Nice,
Iowait: peer.Host.CPU.Times.Iowait,
Irq: peer.Host.CPU.Times.Irq,
Softirq: peer.Host.CPU.Times.Softirq,
Steal: peer.Host.CPU.Times.Steal,
Guest: peer.Host.CPU.Times.Guest,
GuestNice: peer.Host.CPU.Times.GuestNice,
},
}
record.Host.CPU = resource.CPU{
LogicalCount: peer.Host.CPU.LogicalCount,
PhysicalCount: peer.Host.CPU.PhysicalCount,
Percent: peer.Host.CPU.Percent,
ProcessPercent: peer.Host.CPU.ProcessPercent,
Times: resource.CPUTimes{
User: peer.Host.CPU.Times.User,
System: peer.Host.CPU.Times.System,
Idle: peer.Host.CPU.Times.Idle,
Nice: peer.Host.CPU.Times.Nice,
Iowait: peer.Host.CPU.Times.Iowait,
Irq: peer.Host.CPU.Times.Irq,
Softirq: peer.Host.CPU.Times.Softirq,
Steal: peer.Host.CPU.Times.Steal,
Guest: peer.Host.CPU.Times.Guest,
GuestNice: peer.Host.CPU.Times.GuestNice,
},
}
if peer.Host.Memory != nil {
record.Host.Memory = storage.Memory{
Total: peer.Host.Memory.Total,
Available: peer.Host.Memory.Available,
Used: peer.Host.Memory.Used,
UsedPercent: peer.Host.Memory.UsedPercent,
ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
Free: peer.Host.Memory.Free,
}
record.Host.Memory = resource.Memory{
Total: peer.Host.Memory.Total,
Available: peer.Host.Memory.Available,
Used: peer.Host.Memory.Used,
UsedPercent: peer.Host.Memory.UsedPercent,
ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
Free: peer.Host.Memory.Free,
}
if peer.Host.Network != nil {
record.Host.Network = storage.Network{
TCPConnectionCount: peer.Host.Network.TcpConnectionCount,
UploadTCPConnectionCount: peer.Host.Network.UploadTcpConnectionCount,
SecurityDomain: peer.Host.Network.SecurityDomain,
Location: peer.Host.Network.Location,
IDC: peer.Host.Network.Idc,
}
record.Host.Network = resource.Network{
TCPConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTCPConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
SecurityDomain: peer.Host.Network.SecurityDomain,
Location: peer.Host.Network.Location,
IDC: peer.Host.Network.IDC,
}
if peer.Host.Disk != nil {
record.Host.Disk = storage.Disk{
Total: peer.Host.Disk.Total,
Free: peer.Host.Disk.Free,
Used: peer.Host.Disk.Used,
UsedPercent: peer.Host.Disk.UsedPercent,
InodesTotal: peer.Host.Disk.InodesTotal,
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
}
record.Host.Disk = resource.Disk{
Total: peer.Host.Disk.Total,
Free: peer.Host.Disk.Free,
Used: peer.Host.Disk.Used,
UsedPercent: peer.Host.Disk.UsedPercent,
InodesTotal: peer.Host.Disk.InodesTotal,
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
}
if peer.Host.Build != nil {
record.Host.Build = storage.Build{
GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform,
}
record.Host.Build = resource.Build{
GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform,
}
if req.Code != commonv1.Code_Success {

View File

@ -64,86 +64,96 @@ var (
BackToSourceCount: int(mockTaskBackToSourceLimit),
}
mockHostCPU = &schedulerv1.CPU{
LogicalCount: 24,
PhysicalCount: 12,
Percent: 0.8,
ProcessPercent: 0.4,
Times: &schedulerv1.CPUTimes{
User: 100,
System: 101,
Idle: 102,
Nice: 103,
Iowait: 104,
Irq: 105,
Softirq: 106,
Steal: 107,
Guest: 108,
GuestNice: 109,
mockRawHost = resource.Host{
ID: idgen.HostID("hostname", 8003),
Type: pkgtypes.HostTypeNormal,
Hostname: "hostname",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockRawSeedHost = resource.Host{
ID: idgen.HostID("hostname_seed", 8003),
Type: pkgtypes.HostTypeSuperSeed,
Hostname: "hostname_seed",
IP: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
OS: "darwin",
Platform: "darwin",
PlatformFamily: "Standalone Workstation",
PlatformVersion: "11.1",
KernelVersion: "20.2.0",
CPU: mockCPU,
Memory: mockMemory,
Network: mockNetwork,
Disk: mockDisk,
Build: mockBuild,
}
mockCPU = resource.CPU{
LogicalCount: 4,
PhysicalCount: 2,
Percent: 1,
ProcessPercent: 0.5,
Times: resource.CPUTimes{
User: 240662.2,
System: 317950.1,
Idle: 3393691.3,
Nice: 0,
Iowait: 0,
Irq: 0,
Softirq: 0,
Steal: 0,
Guest: 0,
GuestNice: 0,
},
}
mockHostMemory = &schedulerv1.Memory{
Total: 20,
Available: 19,
Used: 16,
UsedPercent: 0.7,
ProcessUsedPercent: 0.2,
Free: 15,
mockMemory = resource.Memory{
Total: 17179869184,
Available: 5962813440,
Used: 11217055744,
UsedPercent: 65.291858,
ProcessUsedPercent: 41.525125,
Free: 2749598908,
}
mockHostNetwork = &schedulerv1.Network{
TcpConnectionCount: 400,
UploadTcpConnectionCount: 200,
SecurityDomain: "product",
Location: "china",
Idc: "e1",
mockNetwork = resource.Network{
TCPConnectionCount: 10,
UploadTCPConnectionCount: 1,
SecurityDomain: "security_domain",
Location: "location",
IDC: "idc",
}
mockHostDisk = &schedulerv1.Disk{
Total: 100,
Free: 88,
Used: 56,
UsedPercent: 0.9,
InodesTotal: 200,
InodesUsed: 180,
InodesFree: 160,
InodesUsedPercent: 0.6,
mockDisk = resource.Disk{
Total: 499963174912,
Free: 37226479616,
Used: 423809622016,
UsedPercent: 91.92547406065952,
InodesTotal: 4882452880,
InodesUsed: 7835772,
InodesFree: 4874617108,
InodesUsedPercent: 0.1604884305611568,
}
mockHostBuild = &schedulerv1.Build{
GitVersion: "3.0.0",
GitCommit: "2bf4d5e",
GoVersion: "1.19",
Platform: "linux",
}
mockRawHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname", 8003),
Type: pkgtypes.HostTypeNormalName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Cpu: mockHostCPU,
Memory: mockHostMemory,
Network: mockHostNetwork,
Disk: mockHostDisk,
Build: mockHostBuild,
}
mockRawSeedHost = &schedulerv1.AnnounceHostRequest{
Id: idgen.HostID("hostname_seed", 8003),
Type: pkgtypes.HostTypeSuperSeedName,
Ip: "127.0.0.1",
Port: 8003,
DownloadPort: 8001,
Hostname: "hostname",
Cpu: mockHostCPU,
Memory: mockHostMemory,
Network: mockHostNetwork,
Disk: mockHostDisk,
Build: mockHostBuild,
mockBuild = resource.Build{
GitVersion: "v1.0.0",
GitCommit: "221176b117c6d59366d68f2b34d38be50c935883",
GoVersion: "1.18",
Platform: "darwin",
}
mockPeerHost = &schedulerv1.PeerHost{
@ -222,7 +232,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -258,7 +268,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -303,7 +313,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -343,7 +353,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -381,7 +391,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -421,7 +431,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -463,7 +473,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -502,7 +512,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -544,7 +554,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -590,7 +600,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -638,7 +648,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -686,7 +696,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -730,7 +740,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -775,7 +785,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -817,7 +827,7 @@ func TestService_RegisterPeerTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
},
mock: func(
@ -864,10 +874,14 @@ func TestService_RegisterPeerTask(t *testing.T) {
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockSeedHost := resource.NewHost(mockRawSeedHost)
mockSeedHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost)
tc.mock(
tc.req, mockPeer, mockSeedPeer,
@ -1124,7 +1138,9 @@ func TestService_ReportPieceResult(t *testing.T) {
stream := schedulerv1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.mock(mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), stream.EXPECT())
@ -1320,7 +1336,9 @@ func TestService_ReportPeerResult(t *testing.T) {
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT())
@ -1408,7 +1426,7 @@ func TestService_AnnounceTask(t *testing.T) {
Priority: commonv1.Priority_LEVEL0,
},
PeerHost: &schedulerv1.PeerHost{
Id: mockRawHost.Id,
Id: mockRawHost.ID,
},
PiecePacket: &commonv1.PiecePacket{
PieceInfos: []*commonv1.PieceInfo{{PieceNum: 1}},
@ -1631,7 +1649,9 @@ func TestService_AnnounceTask(t *testing.T) {
taskManager := resource.NewMockTaskManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
@ -1826,7 +1846,9 @@ func TestService_LeaveTask(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage)
@ -2039,7 +2061,9 @@ func TestService_LeaveHost(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(mockRawHost)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage)
@ -2504,8 +2528,12 @@ func TestService_triggerTask(t *testing.T) {
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(tc.config, res, scheduler, dynconfig, storage)
mockHost := resource.NewHost(mockRawHost)
mockSeedHost := resource.NewHost(mockRawSeedHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockSeedHost := resource.NewHost(
mockRawSeedHost.ID, mockRawSeedHost.IP, mockRawSeedHost.Hostname,
mockRawSeedHost.Port, mockRawSeedHost.DownloadPort, mockRawSeedHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
mockSeedPeer := resource.NewPeer(mockSeedPeerID, mockTask, mockSeedHost)
@ -2613,12 +2641,12 @@ func TestService_storeHost(t *testing.T) {
mock: func(mockHost *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.Id)).Return(mockHost, true).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(mockHost, true).Times(1),
)
},
expect: func(t *testing.T, host *resource.Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.Id)
assert.Equal(host.ID, mockRawHost.ID)
},
},
{
@ -2631,7 +2659,7 @@ func TestService_storeHost(t *testing.T) {
mock: func(mockHost *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.Id)).Return(nil, false).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false).Times(1),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{LoadLimit: 10}, nil).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Store(gomock.Any()).Return().Times(1),
@ -2639,7 +2667,7 @@ func TestService_storeHost(t *testing.T) {
},
expect: func(t *testing.T, host *resource.Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.Id)
assert.Equal(host.ID, mockRawHost.ID)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(10))
},
},
@ -2653,7 +2681,7 @@ func TestService_storeHost(t *testing.T) {
mock: func(mockHost *resource.Host, hostManager resource.HostManager, mr *resource.MockResourceMockRecorder, mh *resource.MockHostManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
mr.HostManager().Return(hostManager).Times(1),
mh.Load(gomock.Eq(mockRawHost.Id)).Return(nil, false).Times(1),
mh.Load(gomock.Eq(mockRawHost.ID)).Return(nil, false).Times(1),
md.GetSchedulerClusterClientConfig().Return(types.SchedulerClusterClientConfig{}, errors.New("foo")).Times(1),
mr.HostManager().Return(hostManager).Times(1),
mh.Store(gomock.Any()).Return().Times(1),
@ -2661,7 +2689,7 @@ func TestService_storeHost(t *testing.T) {
},
expect: func(t *testing.T, host *resource.Host) {
assert := assert.New(t)
assert.Equal(host.ID, mockRawHost.Id)
assert.Equal(host.ID, mockRawHost.ID)
},
},
}
@ -2676,7 +2704,9 @@ func TestService_storeHost(t *testing.T) {
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
hostManager := resource.NewMockHostManager(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
tc.mock(mockHost, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT())
host := svc.storeHost(context.Background(), tc.req.PeerHost)
@ -2744,7 +2774,9 @@ func TestService_storePeer(t *testing.T) {
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
@ -2807,7 +2839,9 @@ func TestService_triggerSeedPeerTask(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
seedPeer := resource.NewMockSeedPeer(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, task, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
@ -2886,7 +2920,9 @@ func TestService_handleBeginOfPiece(t *testing.T) {
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduler, dynconfig, storage)
@ -2899,7 +2935,9 @@ func TestService_handleBeginOfPiece(t *testing.T) {
}
func TestService_handlePieceSuccess(t *testing.T) {
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
now := time.Now()
@ -3157,7 +3195,9 @@ func TestService_handlePieceFail(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
@ -3278,9 +3318,11 @@ func TestService_handlePeerSuccess(t *testing.T) {
t.Fatal(err)
}
mockRawHost.Ip = ip
mockRawHost.IP = ip
mockRawHost.DownloadPort = int32(port)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage)
@ -3359,7 +3401,9 @@ func TestService_handlePeerFail(t *testing.T) {
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnablePeerHost: true}}, res, scheduler, dynconfig, storage)
mockHost := resource.NewHost(mockRawHost)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, resource.WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
child := resource.NewPeer(mockPeerID, mockTask, mockHost)

View File

@ -31,6 +31,7 @@ import (
"github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/resource"
)
var (
@ -63,12 +64,12 @@ var (
ConcurrentUploadCount: 40,
UploadCount: 20,
UploadFailedCount: 3,
CPU: CPU{
CPU: resource.CPU{
LogicalCount: 24,
PhysicalCount: 12,
Percent: 0.8,
ProcessPercent: 0.4,
Times: CPUTimes{
Times: resource.CPUTimes{
User: 100,
System: 101,
Idle: 102,
@ -81,7 +82,7 @@ var (
GuestNice: 109,
},
},
Memory: Memory{
Memory: resource.Memory{
Total: 20,
Available: 19,
Used: 16,
@ -89,14 +90,14 @@ var (
ProcessUsedPercent: 0.2,
Free: 15,
},
Network: Network{
Network: resource.Network{
TCPConnectionCount: 400,
UploadTCPConnectionCount: 200,
SecurityDomain: "product",
Location: "china",
IDC: "e1",
},
Disk: Disk{
Disk: resource.Disk{
Total: 100,
Free: 88,
Used: 56,
@ -106,7 +107,7 @@ var (
InodesFree: 160,
InodesUsedPercent: 0.6,
},
Build: Build{
Build: resource.Build{
GitVersion: "3.0.0",
GitCommit: "2bf4d5e",
GoVersion: "1.19",

View File

@ -16,7 +16,11 @@
package storage
import "time"
import (
"time"
"d7y.io/dragonfly/v2/scheduler/resource"
)
// Task contains content for task.
type Task struct {
@ -99,19 +103,19 @@ type Host struct {
UploadFailedCount int64 `csv:"uploadFailedCount"`
// CPU Stat.
CPU CPU `csv:"cpu"`
CPU resource.CPU `csv:"cpu"`
// Memory Stat.
Memory Memory `csv:"memory"`
Memory resource.Memory `csv:"memory"`
// Network Stat.
Network Network `csv:"network"`
Network resource.Network `csv:"network"`
// Disk Stat.
Disk Disk `csv:"disk"`
Disk resource.Disk `csv:"disk"`
// Build information.
Build Build `csv:"build"`
Build resource.Build `csv:"build"`
// CreatedAt is peer create nanosecond time.
CreatedAt int64 `csv:"createdAt"`
@ -120,138 +124,6 @@ type Host struct {
UpdatedAt int64 `csv:"updatedAt"`
}
// CPU contains content for cpu.
type CPU struct {
// Number of logical cores in the system.
LogicalCount uint32 `csv:"logicalCount"`
// Number of physical cores in the system.
PhysicalCount uint32 `csv:"physicalCount"`
// Percent calculates the percentage of cpu used.
Percent float64 `csv:"percent"`
// Calculates the percentage of cpu used by process.
ProcessPercent float64 `csv:"processPercent"`
// Times contains the amounts of time the CPU has spent performing different kinds of work.
Times CPUTimes `csv:"times"`
}
// CPUTimes contains content for cpu times.
type CPUTimes struct {
// CPU time of user.
User float64 `csv:"user"`
// CPU time of system.
System float64 `csv:"system"`
// CPU time of idle.
Idle float64 `csv:"idle"`
// CPU time of nice.
Nice float64 `csv:"nice"`
// CPU time of iowait.
Iowait float64 `csv:"iowait"`
// CPU time of irq.
Irq float64 `csv:"irq"`
// CPU time of softirq.
Softirq float64 `csv:"softirq"`
// CPU time of steal.
Steal float64 `csv:"steal"`
// CPU time of guest.
Guest float64 `csv:"guest"`
// CPU time of guest nice.
GuestNice float64 `csv:"guestNice"`
}
// Memory contains content for memory.
type Memory struct {
// Total amount of RAM on this system.
Total uint64 `csv:"total"`
// RAM available for programs to allocate.
Available uint64 `csv:"available"`
// RAM used by programs.
Used uint64 `csv:"used"`
// Percentage of RAM used by programs.
UsedPercent float64 `csv:"usedPercent"`
// Calculates the percentage of memory used by process.
ProcessUsedPercent float64 `csv:"processUsedPercent"`
// This is the kernel's notion of free memory.
Free uint64 `csv:"free"`
}
// Network contains content for network.
type Network struct {
// Return count of tcp connections opened and status is ESTABLISHED.
TCPConnectionCount uint32 `csv:"tcpConnectionCount"`
// Return count of upload tcp connections opened and status is ESTABLISHED.
UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"`
// Security domain for network.
SecurityDomain string `csv:"securityDomain"`
// Location path(area|country|province|city|...).
Location string `csv:"location"`
// IDC where the peer host is located
IDC string `csv:"idc"`
}
// Build contains content for build.
type Build struct {
// Git version.
GitVersion string `csv:"gitVersion"`
// Git commit.
GitCommit string `csv:"gitCommit"`
// Golang version.
GoVersion string `csv:"goVersion"`
// Build platform.
Platform string `csv:"platform"`
}
// Disk contains content for disk.
type Disk struct {
// Total amount of disk on the data path of dragonfly.
Total uint64 `csv:"total"`
// Free amount of disk on the data path of dragonfly.
Free uint64 `csv:"free"`
// Used amount of disk on the data path of dragonfly.
Used uint64 `csv:"used"`
// Used percent of disk on the data path of dragonfly directory.
UsedPercent float64 `csv:"usedPercent"`
// Total amount of indoes on the data path of dragonfly directory.
InodesTotal uint64 `csv:"inodesTotal"`
// Used amount of indoes on the data path of dragonfly directory.
InodesUsed uint64 `csv:"inodesUsed"`
// Free amount of indoes on the data path of dragonfly directory.
InodesFree uint64 `csv:"inodesFree"`
// Used percent of indoes on the data path of dragonfly directory.
InodesUsedPercent float64 `csv:"inodesUsedPercent"`
}
// Parent contains content for parent.
type Parent struct {
// ID is peer id.