diff --git a/go.mod b/go.mod index aa3ed0b6e..5a1660b00 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2 go 1.23.0 require ( - d7y.io/api/v2 v2.0.169 + d7y.io/api/v2 v2.0.171 github.com/MysteriousPotato/go-lockable v1.0.0 github.com/RichardKnop/machinery v1.10.8 github.com/Showmax/go-fqdn v1.0.0 diff --git a/go.sum b/go.sum index f856a829d..b45bdc0a0 100644 --- a/go.sum +++ b/go.sum @@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk= cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= -d7y.io/api/v2 v2.0.169 h1:CKxPnhXJ0FNOtyATZ5pw5yolRhV6mhlFnEOvgBs9cRA= -d7y.io/api/v2 v2.0.169/go.mod h1:s3ovYyCQQ9RHUC+RMpAZYI075vkaz/PcLpoyTZqvvOg= +d7y.io/api/v2 v2.0.171 h1:iHMAhim/BFJ6MhZzsGMmVqF/h0Atw59g/0GuYFyhGxg= +d7y.io/api/v2 v2.0.171/go.mod h1:HLM5CjwBmy1pDGNUsUNkQeQPItblnHmeTJBEvBDbGnE= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U= diff --git a/scheduler/announcer/announcer.go b/scheduler/announcer/announcer.go index e04729d4b..de3baf1e3 100644 --- a/scheduler/announcer/announcer.go +++ b/scheduler/announcer/announcer.go @@ -26,7 +26,6 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" "d7y.io/dragonfly/v2/scheduler/config" - "d7y.io/dragonfly/v2/scheduler/storage" ) // Announcer is the interface used for announce service. @@ -42,7 +41,6 @@ type Announcer interface { type announcer struct { config *config.Config managerClient managerclient.V2 - storage storage.Storage done chan struct{} } @@ -50,11 +48,10 @@ type announcer struct { type Option func(s *announcer) // New returns a new Announcer interface. -func New(cfg *config.Config, managerClient managerclient.V2, storage storage.Storage, schedulerFeatures []string, options ...Option) (Announcer, error) { +func New(cfg *config.Config, managerClient managerclient.V2, schedulerFeatures []string, options ...Option) (Announcer, error) { a := &announcer{ config: cfg, managerClient: managerClient, - storage: storage, done: make(chan struct{}), } diff --git a/scheduler/announcer/announcer_test.go b/scheduler/announcer/announcer_test.go index 3294202a6..bb192808d 100644 --- a/scheduler/announcer/announcer_test.go +++ b/scheduler/announcer/announcer_test.go @@ -30,7 +30,6 @@ import ( managertypes "d7y.io/dragonfly/v2/manager/types" managerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/manager/client/mocks" "d7y.io/dragonfly/v2/scheduler/config" - storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) var ( @@ -116,10 +115,9 @@ func TestAnnouncer_New(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { mockManagerClient := managerclientmocks.NewMockV2(ctl) - mockStorage := storagemocks.NewMockStorage(ctl) tc.mock(mockManagerClient.EXPECT()) - a, err := New(tc.config, mockManagerClient, mockStorage, managertypes.DefaultSchedulerFeatures) + a, err := New(tc.config, mockManagerClient, managertypes.DefaultSchedulerFeatures) tc.expect(t, a, err) }) } @@ -134,7 +132,7 @@ func TestAnnouncer_Serve(t *testing.T) { config *config.Config data []byte sleep func() - mock func(data []byte, m *managerclientmocks.MockV2MockRecorder, ms *storagemocks.MockStorageMockRecorder) + mock func(data []byte, m *managerclientmocks.MockV2MockRecorder) except func(t *testing.T, a Announcer) }{ { @@ -161,7 +159,7 @@ func TestAnnouncer_Serve(t *testing.T) { sleep: func() { time.Sleep(3 * time.Second) }, - mock: func(data []byte, m *managerclientmocks.MockV2MockRecorder, ms *storagemocks.MockStorageMockRecorder) { + mock: func(data []byte, m *managerclientmocks.MockV2MockRecorder) { gomock.InOrder( m.UpdateScheduler(gomock.Any(), gomock.Eq(&managerv2.UpdateSchedulerRequest{ SourceType: managerv2.SourceType_SCHEDULER_SOURCE, @@ -190,10 +188,9 @@ func TestAnnouncer_Serve(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { mockManagerClient := managerclientmocks.NewMockV2(ctl) - mockStorage := storagemocks.NewMockStorage(ctl) - tc.mock(tc.data, mockManagerClient.EXPECT(), mockStorage.EXPECT()) - a, err := New(tc.config, mockManagerClient, mockStorage, managertypes.DefaultSchedulerFeatures) + tc.mock(tc.data, mockManagerClient.EXPECT()) + a, err := New(tc.config, mockManagerClient, managertypes.DefaultSchedulerFeatures) if err != nil { t.Fatal(err) } @@ -267,10 +264,9 @@ func TestAnnouncer_announceToManager(t *testing.T) { ctl := gomock.NewController(t) defer ctl.Finish() mockManagerClient := managerclientmocks.NewMockV2(ctl) - mockStorage := storagemocks.NewMockStorage(ctl) tc.mock(mockManagerClient.EXPECT()) - a, err := New(tc.config, mockManagerClient, mockStorage, managertypes.DefaultSchedulerFeatures) + a, err := New(tc.config, mockManagerClient, managertypes.DefaultSchedulerFeatures) if err != nil { t.Fatal(err) } diff --git a/scheduler/resource/persistentcache/host.go b/scheduler/resource/persistentcache/host.go index 734f27fd5..c4dc71228 100644 --- a/scheduler/resource/persistentcache/host.go +++ b/scheduler/resource/persistentcache/host.go @@ -116,145 +116,151 @@ type Host struct { // CPU contains content for cpu. type CPU struct { // Number of logical cores in the system. - LogicalCount uint32 `csv:"logicalCount"` + LogicalCount uint32 // Number of physical cores in the system. - PhysicalCount uint32 `csv:"physicalCount"` + PhysicalCount uint32 // Percent calculates the percentage of cpu used. - Percent float64 `csv:"percent"` + Percent float64 // Calculates the percentage of cpu used by process. - ProcessPercent float64 `csv:"processPercent"` + ProcessPercent float64 // Times contains the amounts of time the CPU has spent performing different kinds of work. - Times CPUTimes `csv:"times"` + Times CPUTimes } // CPUTimes contains content for cpu times. type CPUTimes struct { // CPU time of user. - User float64 `csv:"user"` + User float64 // CPU time of system. - System float64 `csv:"system"` + System float64 // CPU time of idle. - Idle float64 `csv:"idle"` + Idle float64 // CPU time of nice. - Nice float64 `csv:"nice"` + Nice float64 // CPU time of iowait. - Iowait float64 `csv:"iowait"` + Iowait float64 // CPU time of irq. - Irq float64 `csv:"irq"` + Irq float64 // CPU time of softirq. - Softirq float64 `csv:"softirq"` + Softirq float64 // CPU time of steal. - Steal float64 `csv:"steal"` + Steal float64 // CPU time of guest. - Guest float64 `csv:"guest"` + Guest float64 // CPU time of guest nice. - GuestNice float64 `csv:"guestNice"` + GuestNice float64 } // Memory contains content for memory. type Memory struct { // Total amount of RAM on this system. - Total uint64 `csv:"total"` + Total uint64 // RAM available for programs to allocate. - Available uint64 `csv:"available"` + Available uint64 // RAM used by programs. - Used uint64 `csv:"used"` + Used uint64 // Percentage of RAM used by programs. - UsedPercent float64 `csv:"usedPercent"` + UsedPercent float64 // Calculates the percentage of memory used by process. - ProcessUsedPercent float64 `csv:"processUsedPercent"` + ProcessUsedPercent float64 // This is the kernel's notion of free memory. - Free uint64 `csv:"free"` + Free uint64 } // Network contains content for network. type Network struct { // Return count of tcp connections opened and status is ESTABLISHED. - TCPConnectionCount uint32 `csv:"tcpConnectionCount"` + TCPConnectionCount uint32 // Return count of upload tcp connections opened and status is ESTABLISHED. - UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"` + UploadTCPConnectionCount uint32 // Location path(area|country|province|city|...). - Location string `csv:"location"` + Location string // IDC where the peer host is located - IDC string `csv:"idc"` + IDC string // Download rate of the host, unit is byte/s. - DownloadRate uint64 `csv:"downloadRate"` + DownloadRate uint64 // Download rate limit of the host, unit is byte/s. - DownloadRateLimit uint64 `csv:"downloadRateLimit"` + DownloadRateLimit uint64 // Upload rate of the host, unit is byte/s. - UploadRate uint64 `csv:"uploadRate"` + UploadRate uint64 // Upload rate limit of the host, unit is byte/s. - UploadRateLimit uint64 `csv:"uploadRateLimit"` + UploadRateLimit uint64 } // Build contains content for build. type Build struct { // Git version. - GitVersion string `csv:"gitVersion"` + GitVersion string // Git commit. - GitCommit string `csv:"gitCommit"` + GitCommit string // Golang version. - GoVersion string `csv:"goVersion"` + GoVersion string // Rust version. - RustVersion string `csv:"rustVersion"` + RustVersion string // Build platform. - Platform string `csv:"platform"` + Platform string } // Disk contains content for disk. type Disk struct { // Total amount of disk on the data path of dragonfly. - Total uint64 `csv:"total"` + Total uint64 // Free amount of disk on the data path of dragonfly. - Free uint64 `csv:"free"` + Free uint64 // Used amount of disk on the data path of dragonfly. - Used uint64 `csv:"used"` + Used uint64 // Used percent of disk on the data path of dragonfly directory. - UsedPercent float64 `csv:"usedPercent"` + UsedPercent float64 // Total amount of indoes on the data path of dragonfly directory. - InodesTotal uint64 `csv:"inodesTotal"` + InodesTotal uint64 // Used amount of indoes on the data path of dragonfly directory. - InodesUsed uint64 `csv:"inodesUsed"` + InodesUsed uint64 // Free amount of indoes on the data path of dragonfly directory. - InodesFree uint64 `csv:"inodesFree"` + InodesFree uint64 // Used percent of indoes on the data path of dragonfly directory. - InodesUsedPercent float64 `csv:"inodesUsedPercent"` + InodesUsedPercent float64 + + // Disk write bandwidth, unit is byte/s. + WriteBandwidth uint64 + + // Disk read bandwidth, unit is byte/s. + ReadBandwidth uint64 } // New host instance. diff --git a/scheduler/resource/persistentcache/host_manager.go b/scheduler/resource/persistentcache/host_manager.go index cb0a0d254..3fbccdcb5 100644 --- a/scheduler/resource/persistentcache/host_manager.go +++ b/scheduler/resource/persistentcache/host_manager.go @@ -380,6 +380,18 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { return nil, false } + diskWriteBandwidth, err := strconv.ParseUint(rawHost["disk_write_bandwidth"], 10, 64) + if err != nil { + log.Errorf("parsing disk write bandwidth failed: %v", err) + return nil, false + } + + diskReadBandwidth, err := strconv.ParseUint(rawHost["disk_read_bandwidth"], 10, 64) + if err != nil { + log.Errorf("parsing disk read bandwidth failed: %v", err) + return nil, false + } + disk := Disk{ Total: diskTotal, Free: diskFree, @@ -389,6 +401,8 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) { InodesUsed: diskInodesUsed, InodesFree: diskInodesFree, InodesUsedPercent: diskInodesUsedPercent, + WriteBandwidth: diskWriteBandwidth, + ReadBandwidth: diskReadBandwidth, } build := Build{ @@ -498,6 +512,8 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error { "disk_inodes_used", host.Disk.InodesUsed, "disk_inodes_free", host.Disk.InodesFree, "disk_inodes_used_percent", host.Disk.InodesUsedPercent, + "disk_write_bandwidth", host.Disk.WriteBandwidth, + "disk_read_bandwidth", host.Disk.ReadBandwidth, "build_git_version", host.Build.GitVersion, "build_git_commit", host.Build.GitCommit, "build_go_version", host.Build.GoVersion, diff --git a/scheduler/resource/standard/host.go b/scheduler/resource/standard/host.go index 26972ef18..74e915753 100644 --- a/scheduler/resource/standard/host.go +++ b/scheduler/resource/standard/host.go @@ -230,142 +230,148 @@ type Host struct { // CPU contains content for cpu. type CPU struct { // Number of logical cores in the system. - LogicalCount uint32 `csv:"logicalCount"` + LogicalCount uint32 // Number of physical cores in the system. - PhysicalCount uint32 `csv:"physicalCount"` + PhysicalCount uint32 // Percent calculates the percentage of cpu used. - Percent float64 `csv:"percent"` + Percent float64 // Calculates the percentage of cpu used by process. - ProcessPercent float64 `csv:"processPercent"` + ProcessPercent float64 // Times contains the amounts of time the CPU has spent performing different kinds of work. - Times CPUTimes `csv:"times"` + Times CPUTimes } // CPUTimes contains content for cpu times. type CPUTimes struct { // CPU time of user. - User float64 `csv:"user"` + User float64 // CPU time of system. - System float64 `csv:"system"` + System float64 // CPU time of idle. - Idle float64 `csv:"idle"` + Idle float64 // CPU time of nice. - Nice float64 `csv:"nice"` + Nice float64 // CPU time of iowait. - Iowait float64 `csv:"iowait"` + Iowait float64 // CPU time of irq. - Irq float64 `csv:"irq"` + Irq float64 // CPU time of softirq. - Softirq float64 `csv:"softirq"` + Softirq float64 // CPU time of steal. - Steal float64 `csv:"steal"` + Steal float64 // CPU time of guest. - Guest float64 `csv:"guest"` + Guest float64 // CPU time of guest nice. - GuestNice float64 `csv:"guestNice"` + GuestNice float64 } // Memory contains content for memory. type Memory struct { // Total amount of RAM on this system. - Total uint64 `csv:"total"` + Total uint64 // RAM available for programs to allocate. - Available uint64 `csv:"available"` + Available uint64 // RAM used by programs. - Used uint64 `csv:"used"` + Used uint64 // Percentage of RAM used by programs. - UsedPercent float64 `csv:"usedPercent"` + UsedPercent float64 // Calculates the percentage of memory used by process. - ProcessUsedPercent float64 `csv:"processUsedPercent"` + ProcessUsedPercent float64 // This is the kernel's notion of free memory. - Free uint64 `csv:"free"` + Free uint64 } // Network contains content for network. type Network struct { // Return count of tcp connections opened and status is ESTABLISHED. - TCPConnectionCount uint32 `csv:"tcpConnectionCount"` + TCPConnectionCount uint32 // Return count of upload tcp connections opened and status is ESTABLISHED. - UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"` + UploadTCPConnectionCount uint32 // Location path(area|country|province|city|...). - Location string `csv:"location"` + Location string // IDC where the peer host is located - IDC string `csv:"idc"` + IDC string // Download rate of the host, unit is byte/s. - DownloadRate uint64 `csv:"downloadRate"` + DownloadRate uint64 // Download rate limit of the host, unit is byte/s. - DownloadRateLimit uint64 `csv:"downloadRateLimit"` + DownloadRateLimit uint64 // Upload rate of the host, unit is byte/s. - UploadRate uint64 `csv:"uploadRate"` + UploadRate uint64 // Upload rate limit of the host, unit is byte/s. - UploadRateLimit uint64 `csv:"uploadRateLimit"` + UploadRateLimit uint64 } // Build contains content for build. type Build struct { // Git version. - GitVersion string `csv:"gitVersion"` + GitVersion string // Git commit. - GitCommit string `csv:"gitCommit"` + GitCommit string // Golang version. - GoVersion string `csv:"goVersion"` + GoVersion string // Build platform. - Platform string `csv:"platform"` + Platform string } // Disk contains content for disk. type Disk struct { // Total amount of disk on the data path of dragonfly. - Total uint64 `csv:"total"` + Total uint64 // Free amount of disk on the data path of dragonfly. - Free uint64 `csv:"free"` + Free uint64 // Used amount of disk on the data path of dragonfly. - Used uint64 `csv:"used"` + Used uint64 // Used percent of disk on the data path of dragonfly directory. - UsedPercent float64 `csv:"usedPercent"` + UsedPercent float64 // Total amount of indoes on the data path of dragonfly directory. - InodesTotal uint64 `csv:"inodesTotal"` + InodesTotal uint64 // Used amount of indoes on the data path of dragonfly directory. - InodesUsed uint64 `csv:"inodesUsed"` + InodesUsed uint64 // Free amount of indoes on the data path of dragonfly directory. - InodesFree uint64 `csv:"inodesFree"` + InodesFree uint64 // Used percent of indoes on the data path of dragonfly directory. - InodesUsedPercent float64 `csv:"inodesUsedPercent"` + InodesUsedPercent float64 + + // Disk write bandwidth, unit is byte/s. + WriteBandwidth uint64 + + // Disk read bandwidth, unit is byte/s. + ReadBandwidth uint64 } // New host instance. diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index c2f1a3936..3957b797e 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -24,7 +24,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource/persistentcache" "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling" - "d7y.io/dragonfly/v2/scheduler/storage" ) // New returns a new scheduler server from the given options. @@ -34,11 +33,10 @@ func New( persistentCacheResource persistentcache.Resource, scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, - storage storage.Storage, opts ...grpc.ServerOption, ) *grpc.Server { return server.New( - newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage), - newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage), + newSchedulerServerV1(cfg, resource, scheduling, dynconfig), + newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig), opts...) } diff --git a/scheduler/rpcserver/rpcserver_test.go b/scheduler/rpcserver/rpcserver_test.go index 2ae87d208..f9eeb47ab 100644 --- a/scheduler/rpcserver/rpcserver_test.go +++ b/scheduler/rpcserver/rpcserver_test.go @@ -29,7 +29,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource/persistentcache" "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" - storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) var ( @@ -63,9 +62,8 @@ func TestRPCServer_New(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.expect(t, svr) }) } diff --git a/scheduler/rpcserver/scheduler_server_v1.go b/scheduler/rpcserver/scheduler_server_v1.go index 6dc92bbbd..f3d6b12b0 100644 --- a/scheduler/rpcserver/scheduler_server_v1.go +++ b/scheduler/rpcserver/scheduler_server_v1.go @@ -31,7 +31,6 @@ import ( resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/service" - "d7y.io/dragonfly/v2/scheduler/storage" ) // schedulerServerV1 is v1 version of the scheduler grpc server. @@ -46,9 +45,8 @@ func newSchedulerServerV1( resource resource.Resource, scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, - storage storage.Storage, ) schedulerv1.SchedulerServer { - return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage)} + return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig)} } // RegisterPeerTask registers peer and triggers seed peer download task. diff --git a/scheduler/rpcserver/scheduler_server_v2.go b/scheduler/rpcserver/scheduler_server_v2.go index 5d63bb493..80b2dd694 100644 --- a/scheduler/rpcserver/scheduler_server_v2.go +++ b/scheduler/rpcserver/scheduler_server_v2.go @@ -30,7 +30,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/service" - "d7y.io/dragonfly/v2/scheduler/storage" ) // schedulerServerV2 is v2 version of the scheduler grpc server. @@ -46,9 +45,8 @@ func newSchedulerServerV2( persistentCacheResource persistentcache.Resource, scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, - storage storage.Storage, ) schedulerv2.SchedulerServer { - return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage)} + return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig)} } // AnnouncePeer announces peer to scheduler. diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 493089c1f..4651948e1 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -46,7 +46,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/rpcserver" "d7y.io/dragonfly/v2/scheduler/scheduling" - "d7y.io/dragonfly/v2/scheduler/storage" ) const ( @@ -81,9 +80,6 @@ type Server struct { // Async job. job job.Job - // Storage interface. - storage storage.Storage - // Announcer interface. announcer announcer.Announcer @@ -95,18 +91,6 @@ type Server struct { func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) { s := &Server{config: cfg} - // Initialize Storage. - storage, err := storage.New( - d.DataDir(), - cfg.Storage.MaxSize, - cfg.Storage.MaxBackups, - cfg.Storage.BufferSize, - ) - if err != nil { - return nil, err - } - s.storage = storage - // Initialize dial options of manager grpc client. managerDialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler())} if cfg.Manager.TLS != nil { @@ -149,7 +133,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err if cfg.Job.Enable && rdb != nil { schedulerFeatures = append(schedulerFeatures, managertypes.SchedulerFeaturePreheat) } - announcer, err := announcer.New(cfg, s.managerClient, storage, schedulerFeatures) + announcer, err := announcer.New(cfg, s.managerClient, schedulerFeatures) if err != nil { return nil, err } @@ -226,7 +210,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(rpc.NewInsecureCredentials())) } - svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, s.storage, schedulerServerOptions...) + svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, schedulerServerOptions...) s.grpcServer = svr // Initialize metrics. @@ -316,13 +300,6 @@ func (s *Server) Stop() { logger.Info("stop resource closed") } - // Clean download storage. - if err := s.storage.ClearDownload(); err != nil { - logger.Errorf("clean download storage failed %s", err.Error()) - } else { - logger.Info("clean download storage completed") - } - // Stop GC. s.gc.Stop() logger.Info("gc closed") diff --git a/scheduler/scheduling/evaluator/evaluator_base_test.go b/scheduler/scheduling/evaluator/evaluator_base_test.go index 376be64fc..5b72b6368 100644 --- a/scheduler/scheduling/evaluator/evaluator_base_test.go +++ b/scheduler/scheduling/evaluator/evaluator_base_test.go @@ -123,6 +123,8 @@ var ( InodesUsed: 7835772, InodesFree: 4874617108, InodesUsedPercent: 0.1604884305611568, + WriteBandwidth: 1, + ReadBandwidth: 1, } mockBuild = resource.Build{ diff --git a/scheduler/scheduling/scheduling.go b/scheduler/scheduling/scheduling.go index 305a377e9..ad64d0a1d 100644 --- a/scheduler/scheduling/scheduling.go +++ b/scheduler/scheduling/scheduling.go @@ -682,6 +682,8 @@ func ConstructSuccessNormalTaskResponse(candidateParents []*resource.Peer) *sche InodesUsed: candidateParent.Host.Disk.InodesUsed, InodesFree: candidateParent.Host.Disk.InodesFree, InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent, + WriteBandwidth: candidateParent.Host.Disk.WriteBandwidth, + ReadBandwidth: candidateParent.Host.Disk.ReadBandwidth, }, Build: &commonv2.Build{ GitVersion: candidateParent.Host.Build.GitVersion, diff --git a/scheduler/service/service_v1.go b/scheduler/service/service_v1.go index 0e51ff936..5d47d4652 100644 --- a/scheduler/service/service_v1.go +++ b/scheduler/service/service_v1.go @@ -46,7 +46,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/metrics" resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling" - "d7y.io/dragonfly/v2/scheduler/storage" ) // V1 is the interface for v1 version of the service. @@ -62,9 +61,6 @@ type V1 struct { // Dynamic config. dynconfig config.DynconfigInterface - - // Storage interface. - storage storage.Storage } // New v1 version of service instance. @@ -73,14 +69,12 @@ func NewV1( resource resource.Resource, scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, - storage storage.Storage, ) *V1 { return &V1{ resource: resource, scheduling: scheduling, config: cfg, dynconfig: dynconfig, - storage: storage, } } @@ -300,7 +294,6 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(), peer.Host.Type.Name()).Inc() - parents := peer.Parents() if !req.GetSuccess() { peer.Log.Error("report failed peer") if peer.FSM.Is(resource.PeerStateBackToSource) { @@ -308,7 +301,6 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), peer.Host.Type.Name()).Inc() - go v.createDownloadRecord(peer, parents, req) v.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil) v.handlePeerFailure(ctx, peer) return nil @@ -318,21 +310,18 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult) metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), peer.Host.Type.Name()).Inc() - go v.createDownloadRecord(peer, parents, req) v.handlePeerFailure(ctx, peer) return nil } peer.Log.Info("report success peer") if peer.FSM.Is(resource.PeerStateBackToSource) { - go v.createDownloadRecord(peer, parents, req) v.handleTaskSuccess(ctx, peer.Task, req) v.handlePeerSuccess(ctx, peer) metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(req.GetCost())) return nil } - go v.createDownloadRecord(peer, parents, req) v.handlePeerSuccess(ctx, peer) metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(req.GetCost())) return nil @@ -1338,220 +1327,3 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS return } } - -// createDownloadRecord stores peer download records. -func (v *V1) createDownloadRecord(peer *resource.Peer, parents []*resource.Peer, req *schedulerv1.PeerResult) { - var parentRecords []storage.Parent - for _, parent := range parents { - parentRecord := storage.Parent{ - ID: parent.ID, - Tag: parent.Task.Tag, - Application: parent.Task.Application, - State: parent.FSM.Current(), - Cost: parent.Cost.Load().Nanoseconds(), - UploadPieceCount: 0, - FinishedPieceCount: int32(parent.FinishedPieces.Count()), - CreatedAt: parent.CreatedAt.Load().UnixNano(), - UpdatedAt: parent.UpdatedAt.Load().UnixNano(), - Host: storage.Host{ - ID: parent.Host.ID, - Type: parent.Host.Type.Name(), - Hostname: parent.Host.Hostname, - IP: parent.Host.IP, - Port: parent.Host.Port, - DownloadPort: parent.Host.DownloadPort, - OS: parent.Host.OS, - Platform: parent.Host.Platform, - PlatformFamily: parent.Host.PlatformFamily, - PlatformVersion: parent.Host.PlatformVersion, - KernelVersion: parent.Host.KernelVersion, - ConcurrentUploadLimit: parent.Host.ConcurrentUploadLimit.Load(), - ConcurrentUploadCount: parent.Host.ConcurrentUploadCount.Load(), - UploadCount: parent.Host.UploadCount.Load(), - UploadFailedCount: parent.Host.UploadFailedCount.Load(), - CreatedAt: parent.Host.CreatedAt.Load().UnixNano(), - UpdatedAt: parent.Host.UpdatedAt.Load().UnixNano(), - }, - } - - parentRecord.Host.CPU = resource.CPU{ - LogicalCount: parent.Host.CPU.LogicalCount, - PhysicalCount: parent.Host.CPU.PhysicalCount, - Percent: parent.Host.CPU.Percent, - ProcessPercent: parent.Host.CPU.ProcessPercent, - Times: resource.CPUTimes{ - User: parent.Host.CPU.Times.User, - System: parent.Host.CPU.Times.System, - Idle: parent.Host.CPU.Times.Idle, - Nice: parent.Host.CPU.Times.Nice, - Iowait: parent.Host.CPU.Times.Iowait, - Irq: parent.Host.CPU.Times.Irq, - Softirq: parent.Host.CPU.Times.Softirq, - Steal: parent.Host.CPU.Times.Steal, - Guest: parent.Host.CPU.Times.Guest, - GuestNice: parent.Host.CPU.Times.GuestNice, - }, - } - - parentRecord.Host.Memory = resource.Memory{ - Total: parent.Host.Memory.Total, - Available: parent.Host.Memory.Available, - Used: parent.Host.Memory.Used, - UsedPercent: parent.Host.Memory.UsedPercent, - ProcessUsedPercent: parent.Host.Memory.ProcessUsedPercent, - Free: parent.Host.Memory.Free, - } - - parentRecord.Host.Network = resource.Network{ - TCPConnectionCount: parent.Host.Network.TCPConnectionCount, - UploadTCPConnectionCount: parent.Host.Network.UploadTCPConnectionCount, - Location: parent.Host.Network.Location, - IDC: parent.Host.Network.IDC, - } - - parentRecord.Host.Disk = resource.Disk{ - Total: parent.Host.Disk.Total, - Free: parent.Host.Disk.Free, - Used: parent.Host.Disk.Used, - UsedPercent: parent.Host.Disk.UsedPercent, - InodesTotal: parent.Host.Disk.InodesTotal, - InodesUsed: parent.Host.Disk.InodesUsed, - InodesFree: parent.Host.Disk.InodesFree, - InodesUsedPercent: parent.Host.Disk.InodesUsedPercent, - } - - parentRecord.Host.Build = resource.Build{ - GitVersion: parent.Host.Build.GitVersion, - GitCommit: parent.Host.Build.GitCommit, - GoVersion: parent.Host.Build.GoVersion, - Platform: parent.Host.Build.Platform, - } - - peer.Pieces.Range(func(key, value any) bool { - piece, ok := value.(*resource.Piece) - if !ok { - return true - } - - if piece.ParentID == parent.ID { - parentRecord.UploadPieceCount++ - parentRecord.Pieces = append(parentRecord.Pieces, storage.Piece{ - Length: int64(piece.Length), - Cost: piece.Cost.Nanoseconds(), - CreatedAt: piece.CreatedAt.UnixNano(), - }) - } - - return true - }) - - parentRecords = append(parentRecords, parentRecord) - } - - download := storage.Download{ - ID: peer.ID, - Tag: peer.Task.Tag, - Application: peer.Task.Application, - State: peer.FSM.Current(), - Cost: peer.Cost.Load().Nanoseconds(), - FinishedPieceCount: int32(peer.FinishedPieces.Count()), - Parents: parentRecords, - CreatedAt: peer.CreatedAt.Load().UnixNano(), - UpdatedAt: peer.UpdatedAt.Load().UnixNano(), - Task: storage.Task{ - ID: peer.Task.ID, - URL: peer.Task.URL, - Type: peer.Task.Type.String(), - ContentLength: peer.Task.ContentLength.Load(), - TotalPieceCount: peer.Task.TotalPieceCount.Load(), - BackToSourceLimit: peer.Task.BackToSourceLimit.Load(), - BackToSourcePeerCount: int32(peer.Task.BackToSourcePeers.Len()), - State: peer.Task.FSM.Current(), - CreatedAt: peer.Task.CreatedAt.Load().UnixNano(), - UpdatedAt: peer.Task.UpdatedAt.Load().UnixNano(), - }, - Host: storage.Host{ - ID: peer.Host.ID, - Type: peer.Host.Type.Name(), - Hostname: peer.Host.Hostname, - IP: peer.Host.IP, - Port: peer.Host.Port, - DownloadPort: peer.Host.DownloadPort, - OS: peer.Host.OS, - Platform: peer.Host.Platform, - PlatformFamily: peer.Host.PlatformFamily, - PlatformVersion: peer.Host.PlatformVersion, - KernelVersion: peer.Host.KernelVersion, - ConcurrentUploadLimit: peer.Host.ConcurrentUploadLimit.Load(), - ConcurrentUploadCount: peer.Host.ConcurrentUploadCount.Load(), - UploadCount: peer.Host.UploadCount.Load(), - UploadFailedCount: peer.Host.UploadFailedCount.Load(), - SchedulerClusterID: int64(peer.Host.SchedulerClusterID), - CreatedAt: peer.Host.CreatedAt.Load().UnixNano(), - UpdatedAt: peer.Host.UpdatedAt.Load().UnixNano(), - }, - } - - download.Host.CPU = resource.CPU{ - LogicalCount: peer.Host.CPU.LogicalCount, - PhysicalCount: peer.Host.CPU.PhysicalCount, - Percent: peer.Host.CPU.Percent, - ProcessPercent: peer.Host.CPU.ProcessPercent, - Times: resource.CPUTimes{ - User: peer.Host.CPU.Times.User, - System: peer.Host.CPU.Times.System, - Idle: peer.Host.CPU.Times.Idle, - Nice: peer.Host.CPU.Times.Nice, - Iowait: peer.Host.CPU.Times.Iowait, - Irq: peer.Host.CPU.Times.Irq, - Softirq: peer.Host.CPU.Times.Softirq, - Steal: peer.Host.CPU.Times.Steal, - Guest: peer.Host.CPU.Times.Guest, - GuestNice: peer.Host.CPU.Times.GuestNice, - }, - } - - download.Host.Memory = resource.Memory{ - Total: peer.Host.Memory.Total, - Available: peer.Host.Memory.Available, - Used: peer.Host.Memory.Used, - UsedPercent: peer.Host.Memory.UsedPercent, - ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent, - Free: peer.Host.Memory.Free, - } - - download.Host.Network = resource.Network{ - TCPConnectionCount: peer.Host.Network.TCPConnectionCount, - UploadTCPConnectionCount: peer.Host.Network.UploadTCPConnectionCount, - Location: peer.Host.Network.Location, - IDC: peer.Host.Network.IDC, - } - - download.Host.Disk = resource.Disk{ - Total: peer.Host.Disk.Total, - Free: peer.Host.Disk.Free, - Used: peer.Host.Disk.Used, - UsedPercent: peer.Host.Disk.UsedPercent, - InodesTotal: peer.Host.Disk.InodesTotal, - InodesUsed: peer.Host.Disk.InodesUsed, - InodesFree: peer.Host.Disk.InodesFree, - InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, - } - - download.Host.Build = resource.Build{ - GitVersion: peer.Host.Build.GitVersion, - GitCommit: peer.Host.Build.GitCommit, - GoVersion: peer.Host.Build.GoVersion, - Platform: peer.Host.Build.Platform, - } - - if req.GetCode() != commonv1.Code_Success { - download.Error = storage.Error{ - Code: req.GetCode().String(), - } - } - - if err := v.storage.CreateDownload(download); err != nil { - peer.Log.Error(err) - } -} diff --git a/scheduler/service/service_v1_test.go b/scheduler/service/service_v1_test.go index 04913a650..697fdfcbf 100644 --- a/scheduler/service/service_v1_test.go +++ b/scheduler/service/service_v1_test.go @@ -57,8 +57,6 @@ import ( resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" - "d7y.io/dragonfly/v2/scheduler/storage" - storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) var ( @@ -138,9 +136,8 @@ func TestService_NewV1(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) resource := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage)) + tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig)) }) } } @@ -801,11 +798,10 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) hostManager := resource.NewMockHostManager(ctl) taskManager := resource.NewMockTaskManager(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -1066,10 +1062,9 @@ func TestServiceV1_ReportPieceResult(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) stream := schedulerv1mocks.NewMockScheduler_ReportPieceResultServer(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -1087,8 +1082,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { name string req *schedulerv1.PeerResult run func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) }{ { name: "peer not found", @@ -1096,8 +1090,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { PeerId: mockPeerID, }, run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) { + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1), @@ -1117,18 +1110,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { PeerId: mockPeerID, }, run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) { - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() - + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockPeer.FSM.SetState(resource.PeerStateFailed) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), - ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1), ) assert := assert.New(t) @@ -1143,18 +1130,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { PeerId: mockPeerID, }, run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) { - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() - + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockPeer.FSM.SetState(resource.PeerStateBackToSource) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), - ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1), ) assert := assert.New(t) @@ -1169,18 +1150,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { PeerId: mockPeerID, }, run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) { - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() - + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockPeer.FSM.SetState(resource.PeerStateFailed) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), - ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1), ) assert := assert.New(t) @@ -1195,18 +1170,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { PeerId: mockPeerID, }, run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) { - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() - + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockPeer.FSM.SetState(resource.PeerStateBackToSource) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), - ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1), ) assert := assert.New(t) @@ -1221,18 +1190,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { PeerId: mockPeerID, }, run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager, - mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder, - md *configmocks.MockDynconfigInterfaceMockRecorder) { - var wg sync.WaitGroup - wg.Add(1) - defer wg.Wait() - + mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) { mockPeer.FSM.SetState(resource.PeerStateBackToSource) gomock.InOrder( mr.PeerManager().Return(peerManager).Times(1), mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1), md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1), - ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1), ) assert := assert.New(t) @@ -1249,16 +1212,15 @@ func TestServiceV1_ReportPeerResult(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost) - tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT(), dynconfig.EXPECT()) + tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) } } @@ -1313,9 +1275,8 @@ func TestServiceV1_StatTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) taskManager := resource.NewMockTaskManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT()) @@ -1607,11 +1568,10 @@ func TestServiceV1_AnnounceTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) hostManager := resource.NewMockHostManager(ctl) taskManager := resource.NewMockTaskManager(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -1807,14 +1767,13 @@ func TestServiceV1_LeaveTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT()) tc.expect(t, peer, svc.LeaveTask(context.Background(), &schedulerv1.PeerTarget{})) @@ -2237,12 +2196,11 @@ func TestServiceV1_AnnounceHost(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) hostManager := resource.NewMockHostManager(ctl) host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2449,14 +2407,13 @@ func TestServiceV1_LeaveHost(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) hostManager := resource.NewMockHostManager(ctl) host := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT()) tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{ @@ -2554,14 +2511,13 @@ func TestServiceV1_prefetchTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) seedPeer := resource.NewMockSeedPeer(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, task, mockHost) - svc := NewV1(tc.config, res, scheduling, dynconfig, storage) + svc := NewV1(tc.config, res, scheduling, dynconfig) taskManager := resource.NewMockTaskManager(ctl) tc.mock(task, peer, taskManager, seedPeer, res.EXPECT(), taskManager.EXPECT(), seedPeer.EXPECT()) @@ -3020,8 +2976,7 @@ func TestServiceV1_triggerTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(tc.config, res, scheduling, dynconfig, storage) + svc := NewV1(tc.config, res, scheduling, dynconfig) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -3124,8 +3079,7 @@ func TestServiceV1_storeTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) taskManager := resource.NewMockTaskManager(ctl) tc.run(t, svc, taskManager, res.EXPECT(), taskManager.EXPECT()) }) @@ -3202,8 +3156,7 @@ func TestServiceV1_storeHost(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) hostManager := resource.NewMockHostManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -3286,8 +3239,7 @@ func TestServiceV1_storePeer(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) peerManager := resource.NewMockPeerManager(ctl) tc.run(t, svc, peerManager, res.EXPECT(), peerManager.EXPECT()) @@ -3345,14 +3297,13 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) seedPeer := resource.NewMockSeedPeer(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, task, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig) tc.mock(task, peer, seedPeer, res.EXPECT(), seedPeer.EXPECT()) svc.triggerSeedPeerTask(context.Background(), &mockPeerRange, task) @@ -3427,13 +3378,12 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig) tc.mock(peer, scheduling.EXPECT()) svc.handleBeginOfPiece(context.Background(), peer) @@ -3566,9 +3516,8 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) tc.mock(tc.peer, peerManager, res.EXPECT(), peerManager.EXPECT()) svc.handlePieceSuccess(context.Background(), tc.peer, tc.piece) @@ -3755,7 +3704,6 @@ func TestServiceV1_handlePieceFail(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) peerManager := resource.NewMockPeerManager(ctl) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, @@ -3764,7 +3712,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) { peer := resource.NewPeer(mockPeerID, mockTask, mockHost) parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost) seedPeer := resource.NewMockSeedPeer(ctl) - svc := NewV1(tc.config, res, scheduling, dynconfig, storage) + svc := NewV1(tc.config, res, scheduling, dynconfig) tc.run(t, svc, peer, parent, tc.piece, peerManager, seedPeer, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT(), seedPeer.EXPECT()) }) @@ -3863,7 +3811,6 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) url, err := url.Parse(s.URL) if err != nil { @@ -3887,7 +3834,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) peer := resource.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) tc.mock(peer) svc.handlePeerSuccess(context.Background(), peer) @@ -3961,8 +3908,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) mockHost := resource.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) @@ -4047,8 +3993,7 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) @@ -4186,8 +4131,7 @@ func TestServiceV1_handleTaskFail(t *testing.T) { scheduling := mocks.NewMockScheduling(ctl) res := resource.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage) + svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig) task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength)) tc.mock(task) diff --git a/scheduler/service/service_v2.go b/scheduler/service/service_v2.go index 2f78a1e35..349a08c91 100644 --- a/scheduler/service/service_v2.go +++ b/scheduler/service/service_v2.go @@ -43,7 +43,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource/persistentcache" "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/scheduling" - "d7y.io/dragonfly/v2/scheduler/storage" ) // V2 is the interface for v2 version of the service. @@ -62,9 +61,6 @@ type V2 struct { // Dynamic config. dynconfig config.DynconfigInterface - - // Storage interface. - storage storage.Storage } // New v2 version of service instance. @@ -74,7 +70,6 @@ func NewV2( persistentCacheResource persistentcache.Resource, scheduling scheduling.Scheduling, dynconfig config.DynconfigInterface, - storage storage.Storage, ) *V2 { return &V2{ resource: resource, @@ -82,7 +77,6 @@ func NewV2( scheduling: scheduling, config: cfg, dynconfig: dynconfig, - storage: storage, } } @@ -367,6 +361,8 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c InodesUsed: peer.Host.Disk.InodesUsed, InodesFree: peer.Host.Disk.InodesFree, InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, + WriteBandwidth: peer.Host.Disk.WriteBandwidth, + ReadBandwidth: peer.Host.Disk.ReadBandwidth, }, Build: &commonv2.Build{ GitVersion: peer.Host.Build.GitVersion, @@ -586,6 +582,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ InodesUsed: req.Host.Disk.GetInodesUsed(), InodesFree: req.Host.Disk.GetInodesFree(), InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + WriteBandwidth: req.Host.Disk.GetWriteBandwidth(), + ReadBandwidth: req.Host.Disk.GetReadBandwidth(), })) } @@ -686,6 +684,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ InodesUsed: req.Host.Disk.GetInodesUsed(), InodesFree: req.Host.Disk.GetInodesFree(), InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + WriteBandwidth: req.Host.Disk.GetWriteBandwidth(), + ReadBandwidth: req.Host.Disk.GetReadBandwidth(), } } @@ -759,6 +759,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ InodesUsed: req.Host.Disk.GetInodesUsed(), InodesFree: req.Host.Disk.GetInodesFree(), InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + WriteBandwidth: req.Host.Disk.GetWriteBandwidth(), + ReadBandwidth: req.Host.Disk.GetReadBandwidth(), }, persistentcache.Build{ GitVersion: req.Host.Build.GetGitVersion(), @@ -850,6 +852,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ InodesUsed: req.Host.Disk.GetInodesUsed(), InodesFree: req.Host.Disk.GetInodesFree(), InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(), + WriteBandwidth: req.Host.Disk.GetWriteBandwidth(), + ReadBandwidth: req.Host.Disk.GetReadBandwidth(), } } @@ -940,6 +944,8 @@ func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, err InodesUsed: host.Disk.InodesUsed, InodesFree: host.Disk.InodesFree, InodesUsedPercent: host.Disk.InodesUsedPercent, + WriteBandwidth: host.Disk.WriteBandwidth, + ReadBandwidth: host.Disk.ReadBandwidth, }, Build: &commonv2.Build{ GitVersion: host.Build.GitVersion, @@ -1670,6 +1676,8 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP InodesUsed: peer.Host.Disk.InodesUsed, InodesFree: peer.Host.Disk.InodesFree, InodesUsedPercent: peer.Host.Disk.InodesUsedPercent, + WriteBandwidth: peer.Host.Disk.WriteBandwidth, + ReadBandwidth: peer.Host.Disk.ReadBandwidth, }, Build: &commonv2.Build{ GitVersion: peer.Host.Build.GitVersion, diff --git a/scheduler/service/service_v2_test.go b/scheduler/service/service_v2_test.go index 476262949..f0043152e 100644 --- a/scheduler/service/service_v2_test.go +++ b/scheduler/service/service_v2_test.go @@ -51,7 +51,6 @@ import ( "d7y.io/dragonfly/v2/scheduler/resource/persistentcache" "d7y.io/dragonfly/v2/scheduler/resource/standard" schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks" - storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" ) var ( @@ -251,9 +250,8 @@ func TestService_NewV2(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) - tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)) + tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)) }) } } @@ -423,14 +421,14 @@ func TestServiceV2_StatPeer(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(peer, peerManager, resource.EXPECT(), peerManager.EXPECT()) resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}) @@ -495,14 +493,13 @@ func TestServiceV2_DeletePeer(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(peer, peerManager, resource.EXPECT(), peerManager.EXPECT()) tc.expect(t, svc.DeletePeer(context.Background(), &schedulerv2.DeletePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})) @@ -584,10 +581,10 @@ func TestServiceV2_StatTask(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + taskManager := standard.NewMockTaskManager(ctl) task := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(task, taskManager, resource.EXPECT(), taskManager.EXPECT()) resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{TaskId: mockTaskID}) @@ -1426,7 +1423,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + hostManager := standard.NewMockHostManager(ctl) persistentcacheHostManager := persistentcache.NewMockHostManager(ctl) host := standard.NewHost( @@ -1440,7 +1437,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) { mockRawPersistentCacheHost.CPU, mockRawPersistentCacheHost.Memory, mockRawPersistentCacheHost.Network, mockRawPersistentCacheHost.Disk, mockRawPersistentCacheHost.Build, mockRawPersistentCacheHost.AnnounceInterval, mockRawPersistentCacheHost.CreatedAt, mockRawPersistentCacheHost.UpdatedAt, mockRawHost.Log) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, host, persistentCacheHost, hostManager, persistentcacheHostManager, resource.EXPECT(), persistentCacheResource.EXPECT(), hostManager.EXPECT(), persistentcacheHostManager.EXPECT(), dynconfig.EXPECT()) }) @@ -1555,12 +1552,12 @@ func TestServiceV2_ListHosts(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + hostManager := standard.NewMockHostManager(ctl) host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type, standard.WithCPU(mockCPU), standard.WithMemory(mockMemory), standard.WithNetwork(mockNetwork), standard.WithDisk(mockDisk), standard.WithBuild(mockBuild)) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(host, hostManager, resource.EXPECT(), hostManager.EXPECT()) resp, err := svc.ListHosts(context.Background()) @@ -1627,14 +1624,14 @@ func TestServiceV2_DeleteHost(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + hostManager := standard.NewMockHostManager(ctl) host := standard.NewHost( mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) mockPeer := standard.NewPeer(mockSeedPeerID, mockTask, host) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig) tc.mock(host, mockPeer, hostManager, resource.EXPECT(), hostManager.EXPECT()) tc.expect(t, mockPeer, svc.DeleteHost(context.Background(), &schedulerv2.DeleteHostRequest{HostId: mockHostID})) @@ -1925,7 +1922,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + hostManager := standard.NewMockHostManager(ctl) peerManager := standard.NewMockPeerManager(ctl) taskManager := standard.NewMockTaskManager(ctl) @@ -1937,7 +1934,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) { mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) seedPeer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, resource.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT()) }) @@ -2022,7 +2019,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2030,7 +2027,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2115,7 +2112,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2123,7 +2120,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2187,7 +2184,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2195,7 +2192,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT()) }) @@ -2261,7 +2258,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2269,7 +2266,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2443,7 +2440,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) url, err := url.Parse(s.URL) @@ -2469,7 +2466,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) { mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2534,7 +2531,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2542,7 +2539,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2654,7 +2651,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2662,7 +2659,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT()) }) @@ -2813,7 +2810,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2821,7 +2818,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT()) }) @@ -2939,7 +2936,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -2947,7 +2944,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T) mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT()) }) @@ -3050,7 +3047,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -3058,7 +3055,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT()) }) @@ -3116,7 +3113,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + peerManager := standard.NewMockPeerManager(ctl) mockHost := standard.NewHost( @@ -3124,7 +3121,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT()) }) @@ -3327,7 +3324,7 @@ func TestServiceV2_handleResource(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + hostManager := standard.NewMockHostManager(ctl) taskManager := standard.NewMockTaskManager(ctl) peerManager := standard.NewMockPeerManager(ctl) @@ -3338,7 +3335,7 @@ func TestServiceV2_handleResource(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) mockPeer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, resource.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT()) }) @@ -3607,7 +3604,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { resource := standard.NewMockResource(ctl) persistentCacheResource := persistentcache.NewMockResource(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl) - storage := storagemocks.NewMockStorage(ctl) + seedPeerClient := standard.NewMockSeedPeer(ctl) mockHost := standard.NewHost( @@ -3615,7 +3612,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) { mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type) mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength)) peer := standard.NewPeer(mockPeerID, mockTask, mockHost) - svc := NewV2(&tc.config, resource, persistentCacheResource, scheduling, dynconfig, storage) + svc := NewV2(&tc.config, resource, persistentCacheResource, scheduling, dynconfig) tc.run(t, svc, peer, seedPeerClient, resource.EXPECT(), seedPeerClient.EXPECT()) }) diff --git a/scheduler/storage/mocks/storage_mock.go b/scheduler/storage/mocks/storage_mock.go deleted file mode 100644 index dc388b26d..000000000 --- a/scheduler/storage/mocks/storage_mock.go +++ /dev/null @@ -1,114 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: storage.go -// -// Generated by this command: -// -// mockgen -destination mocks/storage_mock.go -source storage.go -package mocks -// - -// Package mocks is a generated GoMock package. -package mocks - -import ( - io "io" - reflect "reflect" - - storage "d7y.io/dragonfly/v2/scheduler/storage" - gomock "go.uber.org/mock/gomock" -) - -// MockStorage is a mock of Storage interface. -type MockStorage struct { - ctrl *gomock.Controller - recorder *MockStorageMockRecorder - isgomock struct{} -} - -// MockStorageMockRecorder is the mock recorder for MockStorage. -type MockStorageMockRecorder struct { - mock *MockStorage -} - -// NewMockStorage creates a new mock instance. -func NewMockStorage(ctrl *gomock.Controller) *MockStorage { - mock := &MockStorage{ctrl: ctrl} - mock.recorder = &MockStorageMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockStorage) EXPECT() *MockStorageMockRecorder { - return m.recorder -} - -// ClearDownload mocks base method. -func (m *MockStorage) ClearDownload() error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ClearDownload") - ret0, _ := ret[0].(error) - return ret0 -} - -// ClearDownload indicates an expected call of ClearDownload. -func (mr *MockStorageMockRecorder) ClearDownload() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearDownload", reflect.TypeOf((*MockStorage)(nil).ClearDownload)) -} - -// CreateDownload mocks base method. -func (m *MockStorage) CreateDownload(arg0 storage.Download) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateDownload", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CreateDownload indicates an expected call of CreateDownload. -func (mr *MockStorageMockRecorder) CreateDownload(arg0 any) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDownload", reflect.TypeOf((*MockStorage)(nil).CreateDownload), arg0) -} - -// DownloadCount mocks base method. -func (m *MockStorage) DownloadCount() int64 { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DownloadCount") - ret0, _ := ret[0].(int64) - return ret0 -} - -// DownloadCount indicates an expected call of DownloadCount. -func (mr *MockStorageMockRecorder) DownloadCount() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadCount", reflect.TypeOf((*MockStorage)(nil).DownloadCount)) -} - -// ListDownload mocks base method. -func (m *MockStorage) ListDownload() ([]storage.Download, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListDownload") - ret0, _ := ret[0].([]storage.Download) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// ListDownload indicates an expected call of ListDownload. -func (mr *MockStorageMockRecorder) ListDownload() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListDownload", reflect.TypeOf((*MockStorage)(nil).ListDownload)) -} - -// OpenDownload mocks base method. -func (m *MockStorage) OpenDownload() (io.ReadCloser, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "OpenDownload") - ret0, _ := ret[0].(io.ReadCloser) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// OpenDownload indicates an expected call of OpenDownload. -func (mr *MockStorageMockRecorder) OpenDownload() *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenDownload", reflect.TypeOf((*MockStorage)(nil).OpenDownload)) -} diff --git a/scheduler/storage/storage.go b/scheduler/storage/storage.go deleted file mode 100644 index 2eca71be6..000000000 --- a/scheduler/storage/storage.go +++ /dev/null @@ -1,307 +0,0 @@ -/* - * Copyright 2022 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -//go:generate mockgen -destination mocks/storage_mock.go -source storage.go -package mocks - -package storage - -import ( - "errors" - "fmt" - "io" - "io/fs" - "os" - "path/filepath" - "regexp" - "sort" - "sync" - "time" - - "github.com/gocarina/gocsv" - - logger "d7y.io/dragonfly/v2/internal/dflog" - pkgio "d7y.io/dragonfly/v2/pkg/io" -) - -const ( - // DownloadFilePrefix is prefix of download file name. - DownloadFilePrefix = "download" - - // CSVFileExt is extension of file name. - CSVFileExt = "csv" -) - -const ( - // megabyte is the converted factor of MaxSize and bytes. - megabyte = 1024 * 1024 - - // backupTimeFormat is the timestamp format of backup filename. - backupTimeFormat = "2006-01-02T15-04-05.000" -) - -// Storage is the interface used for storage. -type Storage interface { - // CreateDownload inserts the download into csv file. - CreateDownload(Download) error - - // ListDownload returns all downloads in csv file. - ListDownload() ([]Download, error) - - // DownloadCount returns the count of downloads. - DownloadCount() int64 - - // OpenDownload opens download files for read, it returns io.ReadCloser of download files. - OpenDownload() (io.ReadCloser, error) - - // ClearDownload removes all download files. - ClearDownload() error -} - -// storage provides storage function. -type storage struct { - baseDir string - maxSize int64 - maxBackups int - bufferSize int - - downloadMu *sync.RWMutex - downloadFilename string - downloadBuffer []Download - downloadCount int64 -} - -// New returns a new Storage instance. -func New(baseDir string, maxSize, maxBackups, bufferSize int) (Storage, error) { - s := &storage{ - baseDir: baseDir, - maxSize: int64(maxSize * megabyte), - maxBackups: maxBackups, - bufferSize: bufferSize, - - downloadMu: &sync.RWMutex{}, - downloadFilename: filepath.Join(baseDir, fmt.Sprintf("%s.%s", DownloadFilePrefix, CSVFileExt)), - downloadBuffer: make([]Download, 0, bufferSize), - } - - downloadFile, err := os.OpenFile(s.downloadFilename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) - if err != nil { - return nil, err - } - downloadFile.Close() - - return s, nil -} - -// CreateDownload inserts the download into csv file. -func (s *storage) CreateDownload(download Download) error { - s.downloadMu.Lock() - defer s.downloadMu.Unlock() - - // Write without buffer. - if s.bufferSize == 0 { - if err := s.createDownload(download); err != nil { - return err - } - - // Update download count. - s.downloadCount++ - return nil - } - - // Write downloads to file. - if len(s.downloadBuffer) >= s.bufferSize { - if err := s.createDownload(s.downloadBuffer...); err != nil { - return err - } - - // Update download count. - s.downloadCount += int64(s.bufferSize) - - // Keep allocated memory. - s.downloadBuffer = s.downloadBuffer[:0] - } - - // Write downloads to buffer. - s.downloadBuffer = append(s.downloadBuffer, download) - return nil -} - -// ListDownload returns all downloads in csv file. -func (s *storage) ListDownload() ([]Download, error) { - s.downloadMu.RLock() - defer s.downloadMu.RUnlock() - - fileInfos, err := s.downloadBackups() - if err != nil { - return nil, err - } - - var readers []io.Reader - var readClosers []io.ReadCloser - defer func() { - for _, readCloser := range readClosers { - if err := readCloser.Close(); err != nil { - logger.Error(err) - } - } - }() - - for _, fileInfo := range fileInfos { - file, err := os.Open(filepath.Join(s.baseDir, fileInfo.Name())) - if err != nil { - return nil, err - } - - readers = append(readers, file) - readClosers = append(readClosers, file) - } - - var downloads []Download - if err := gocsv.UnmarshalWithoutHeaders(io.MultiReader(readers...), &downloads); err != nil { - return nil, err - } - - return downloads, nil -} - -// DownloadCount returns the count of downloads. -func (s *storage) DownloadCount() int64 { - return s.downloadCount -} - -// OpenDownload opens download files for read, it returns io.ReadCloser of download files. -func (s *storage) OpenDownload() (io.ReadCloser, error) { - s.downloadMu.RLock() - defer s.downloadMu.RUnlock() - - fileInfos, err := s.downloadBackups() - if err != nil { - return nil, err - } - - var readClosers []io.ReadCloser - for _, fileInfo := range fileInfos { - file, err := os.Open(filepath.Join(s.baseDir, fileInfo.Name())) - if err != nil { - return nil, err - } - - readClosers = append(readClosers, file) - } - - return pkgio.MultiReadCloser(readClosers...), nil -} - -// ClearDownload removes all downloads. -func (s *storage) ClearDownload() error { - s.downloadMu.Lock() - defer s.downloadMu.Unlock() - - fileInfos, err := s.downloadBackups() - if err != nil { - return err - } - - for _, fileInfo := range fileInfos { - filename := filepath.Join(s.baseDir, fileInfo.Name()) - if err := os.Remove(filename); err != nil { - return err - } - } - - return nil -} - -// createDownload inserts the downloads into csv file. -func (s *storage) createDownload(downloads ...Download) (err error) { - file, err := s.openDownloadFile() - if err != nil { - return err - } - defer func() { - if cerr := file.Close(); cerr != nil { - err = errors.Join(err, cerr) - } - }() - - return gocsv.MarshalWithoutHeaders(downloads, file) -} - -// openDownloadFile opens the download file and removes download files that exceed the total size. -func (s *storage) openDownloadFile() (*os.File, error) { - fileInfo, err := os.Stat(s.downloadFilename) - if err != nil { - return nil, err - } - - if s.maxSize <= fileInfo.Size() { - if err := os.Rename(s.downloadFilename, s.downloadBackupFilename()); err != nil { - return nil, err - } - } - - fileInfos, err := s.downloadBackups() - if err != nil { - return nil, err - } - - if s.maxBackups < len(fileInfos)+1 { - filename := filepath.Join(s.baseDir, fileInfos[0].Name()) - if err := os.Remove(filename); err != nil { - return nil, err - } - } - - file, err := os.OpenFile(s.downloadFilename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600) - if err != nil { - return nil, err - } - - return file, nil -} - -// downloadBackupFilename generates download file name of backup files. -func (s *storage) downloadBackupFilename() string { - timestamp := time.Now().Format(backupTimeFormat) - return filepath.Join(s.baseDir, fmt.Sprintf("%s_%s.%s", DownloadFilePrefix, timestamp, CSVFileExt)) -} - -// downloadBackups returns download backup file information. -func (s *storage) downloadBackups() ([]fs.FileInfo, error) { - fileInfos, err := os.ReadDir(s.baseDir) - if err != nil { - return nil, err - } - - var backups []fs.FileInfo - regexp := regexp.MustCompile(DownloadFilePrefix) - for _, fileInfo := range fileInfos { - if !fileInfo.IsDir() && regexp.MatchString(fileInfo.Name()) { - info, _ := fileInfo.Info() - backups = append(backups, info) - } - } - - if len(backups) <= 0 { - return nil, errors.New("download files backup does not exist") - } - - sort.Slice(backups, func(i, j int) bool { - return backups[i].ModTime().Before(backups[j].ModTime()) - }) - - return backups, nil -} diff --git a/scheduler/storage/storage_test.go b/scheduler/storage/storage_test.go deleted file mode 100644 index 43e75ea41..000000000 --- a/scheduler/storage/storage_test.go +++ /dev/null @@ -1,827 +0,0 @@ -/* - * Copyright 2022 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package storage - -import ( - "fmt" - "os" - "path/filepath" - "reflect" - "regexp" - "testing" - "time" - - "github.com/gocarina/gocsv" - "github.com/stretchr/testify/assert" - - "d7y.io/dragonfly/v2/scheduler/config" - resource "d7y.io/dragonfly/v2/scheduler/resource/standard" -) - -var ( - mockTask = Task{ - ID: "1", - URL: "example.com", - Type: "normal", - ContentLength: 2048, - TotalPieceCount: 1, - BackToSourceLimit: 10, - BackToSourcePeerCount: 2, - State: "Succeeded", - CreatedAt: time.Now().UnixNano(), - UpdatedAt: time.Now().UnixNano(), - } - - mockHost = Host{ - ID: "2", - Type: "normal", - Hostname: "localhost", - IP: "127.0.0.1", - Port: 8080, - DownloadPort: 8081, - OS: "linux", - Platform: "ubuntu", - PlatformFamily: "debian", - PlatformVersion: "1.0.0", - KernelVersion: "1.0.0", - ConcurrentUploadLimit: 100, - ConcurrentUploadCount: 40, - UploadCount: 20, - UploadFailedCount: 3, - CPU: resource.CPU{ - LogicalCount: 24, - PhysicalCount: 12, - Percent: 0.8, - ProcessPercent: 0.4, - Times: resource.CPUTimes{ - User: 100, - System: 101, - Idle: 102, - Nice: 103, - Iowait: 104, - Irq: 105, - Softirq: 106, - Steal: 107, - Guest: 108, - GuestNice: 109, - }, - }, - Memory: resource.Memory{ - Total: 20, - Available: 19, - Used: 16, - UsedPercent: 0.7, - ProcessUsedPercent: 0.2, - Free: 15, - }, - Network: resource.Network{ - TCPConnectionCount: 400, - UploadTCPConnectionCount: 200, - Location: "china", - IDC: "e1", - DownloadRate: 100, - DownloadRateLimit: 200, - UploadRate: 100, - UploadRateLimit: 200, - }, - Disk: resource.Disk{ - Total: 100, - Free: 88, - Used: 56, - UsedPercent: 0.9, - InodesTotal: 200, - InodesUsed: 180, - InodesFree: 160, - InodesUsedPercent: 0.6, - }, - Build: resource.Build{ - GitVersion: "3.0.0", - GitCommit: "2bf4d5e", - GoVersion: "1.19", - Platform: "linux", - }, - SchedulerClusterID: 1, - CreatedAt: time.Now().UnixNano(), - UpdatedAt: time.Now().UnixNano(), - } - - mockPiece = Piece{ - Length: 20, - Cost: 10, - CreatedAt: time.Now().UnixNano(), - } - - mockPieces = append(make([]Piece, 9), mockPiece) - - mockParent = Parent{ - ID: "4", - Tag: "m", - Application: "db", - State: "Succeeded", - Cost: 1000, - UploadPieceCount: 10, - FinishedPieceCount: 10, - Host: mockHost, - Pieces: mockPieces, - CreatedAt: time.Now().UnixNano(), - UpdatedAt: time.Now().UnixNano(), - } - - mockParents = append(make([]Parent, 19), mockParent) - - mockDownload = Download{ - ID: "5", - Tag: "d", - Application: "mq", - State: "Succeeded", - Error: Error{ - Code: "unknow", - Message: "unknow", - }, - Cost: 1000, - FinishedPieceCount: 10, - Task: mockTask, - Host: mockHost, - Parents: mockParents, - CreatedAt: time.Now().UnixNano(), - UpdatedAt: time.Now().UnixNano(), - } -) - -func TestStorage_New(t *testing.T) { - tests := []struct { - name string - baseDir string - expect func(t *testing.T, s Storage, err error) - }{ - { - name: "new storage", - baseDir: os.TempDir(), - expect: func(t *testing.T, s Storage, err error) { - assert := assert.New(t) - assert.NoError(err) - assert.Equal(reflect.TypeOf(s).Elem().Name(), "storage") - assert.Equal(s.(*storage).maxSize, int64(config.DefaultStorageMaxSize*megabyte)) - assert.Equal(s.(*storage).maxBackups, config.DefaultStorageMaxBackups) - assert.Equal(s.(*storage).bufferSize, config.DefaultStorageBufferSize) - - assert.Equal(cap(s.(*storage).downloadBuffer), config.DefaultStorageBufferSize) - assert.Equal(len(s.(*storage).downloadBuffer), 0) - assert.Equal(s.(*storage).downloadCount, int64(0)) - - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }, - }, - { - name: "new storage failed", - baseDir: "/foo", - expect: func(t *testing.T, s Storage, err error) { - assert := assert.New(t) - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) - tc.expect(t, s, err) - }) - } -} - -func TestStorage_CreateDownload(t *testing.T) { - tests := []struct { - name string - baseDir string - bufferSize int - mock func(s Storage) - expect func(t *testing.T, s Storage, baseDir string) - }{ - { - name: "create download", - baseDir: os.TempDir(), - bufferSize: 1, - mock: func(s Storage) {}, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.NoError(s.CreateDownload(Download{})) - assert.Equal(s.(*storage).downloadCount, int64(0)) - }, - }, - { - name: "create download without buffer", - baseDir: os.TempDir(), - bufferSize: 0, - mock: func(s Storage) { - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.NoError(s.CreateDownload(Download{})) - assert.Equal(s.(*storage).downloadCount, int64(1)) - - downloads, err := s.ListDownload() - assert.NoError(err) - assert.Equal(len(downloads), 1) - }, - }, - { - name: "write download to file", - baseDir: os.TempDir(), - bufferSize: 1, - mock: func(s Storage) { - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.NoError(s.CreateDownload(Download{})) - assert.NoError(s.CreateDownload(Download{})) - assert.Equal(s.(*storage).downloadCount, int64(1)) - }, - }, - { - name: "open file failed", - baseDir: os.TempDir(), - bufferSize: 0, - mock: func(s Storage) { - s.(*storage).baseDir = "foo" - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.Error(s.CreateDownload(Download{})) - s.(*storage).baseDir = baseDir - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(s) - tc.expect(t, s, tc.baseDir) - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }) - } -} - -func TestStorage_ListDownload(t *testing.T) { - tests := []struct { - name string - baseDir string - bufferSize int - download Download - mock func(t *testing.T, s Storage, baseDir string, download Download) - expect func(t *testing.T, s Storage, baseDir string, download Download) - }{ - { - name: "empty csv file given", - baseDir: os.TempDir(), - bufferSize: config.DefaultStorageBufferSize, - mock: func(t *testing.T, s Storage, baseDir string, download Download) {}, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.ListDownload() - assert.Error(err) - }, - }, - { - name: "get file infos failed", - baseDir: os.TempDir(), - bufferSize: config.DefaultStorageBufferSize, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - s.(*storage).baseDir = "bae" - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.ListDownload() - assert.Error(err) - s.(*storage).baseDir = baseDir - }, - }, - { - name: "open file failed", - baseDir: os.TempDir(), - bufferSize: config.DefaultStorageBufferSize, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - file, err := os.OpenFile(filepath.Join(baseDir, "download_test.csv"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0300) - if err != nil { - t.Fatal(err) - } - file.Close() - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.ListDownload() - assert.Error(err) - }, - }, - { - name: "list downloads of a file", - baseDir: os.TempDir(), - bufferSize: 1, - download: mockDownload, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - if err := s.CreateDownload(download); err != nil { - t.Fatal(err) - } - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.ListDownload() - assert.Error(err) - - if err := s.CreateDownload(download); err != nil { - t.Fatal(err) - } - downloads, err := s.ListDownload() - assert.NoError(err) - assert.Equal(len(downloads), 1) - assert.EqualValues(downloads[0].ID, download.ID) - assert.EqualValues(downloads[0].Tag, download.Tag) - assert.EqualValues(downloads[0].Application, download.Application) - assert.EqualValues(downloads[0].State, download.State) - assert.EqualValues(downloads[0].Error, download.Error) - assert.EqualValues(downloads[0].Cost, download.Cost) - assert.EqualValues(downloads[0].Task, download.Task) - assert.EqualValues(downloads[0].Host, download.Host) - assert.EqualValues(downloads[0].CreatedAt, download.CreatedAt) - assert.EqualValues(downloads[0].UpdatedAt, download.UpdatedAt) - }, - }, - { - name: "list downloads of multi files", - baseDir: os.TempDir(), - bufferSize: 1, - download: Download{}, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - if err := s.CreateDownload(Download{ID: "2"}); err != nil { - t.Fatal(err) - } - - if err := s.CreateDownload(Download{ID: "1"}); err != nil { - t.Fatal(err) - } - - if err := s.CreateDownload(Download{ID: "3"}); err != nil { - t.Fatal(err) - } - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - downloads, err := s.ListDownload() - assert.NoError(err) - assert.Equal(len(downloads), 2) - assert.Equal(downloads[0].ID, "2") - assert.Equal(downloads[1].ID, "1") - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(t, s, tc.baseDir, tc.download) - tc.expect(t, s, tc.baseDir, tc.download) - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }) - } -} - -func TestStorage_DownloadCount(t *testing.T) { - tests := []struct { - name string - baseDir string - mock func(s Storage) - expect func(t *testing.T, s Storage) - }{ - { - name: "get the count of downloads", - baseDir: os.TempDir(), - mock: func(s Storage) { - s.(*storage).downloadCount = 2 - }, - expect: func(t *testing.T, s Storage) { - assert := assert.New(t) - assert.Equal(int64(2), s.DownloadCount()) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(s) - tc.expect(t, s) - }) - } -} - -func TestStorage_OpenDownload(t *testing.T) { - tests := []struct { - name string - baseDir string - bufferSize int - download Download - mock func(t *testing.T, s Storage, baseDir string, download Download) - expect func(t *testing.T, s Storage, baseDir string, download Download) - }{ - { - name: "open storage withempty csv file given", - baseDir: os.TempDir(), - bufferSize: config.DefaultStorageBufferSize, - mock: func(t *testing.T, s Storage, baseDir string, download Download) {}, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.OpenDownload() - assert.NoError(err) - }, - }, - { - name: "open file infos failed", - baseDir: os.TempDir(), - bufferSize: config.DefaultStorageBufferSize, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - s.(*storage).baseDir = "bas" - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.OpenDownload() - assert.Error(err) - s.(*storage).baseDir = baseDir - }, - }, - { - name: "open storage with downloads of a file", - baseDir: os.TempDir(), - bufferSize: 1, - download: mockDownload, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - if err := s.CreateDownload(download); err != nil { - t.Fatal(err) - } - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - _, err := s.OpenDownload() - assert.NoError(err) - - if err := s.CreateDownload(download); err != nil { - t.Fatal(err) - } - - readCloser, err := s.OpenDownload() - assert.NoError(err) - - var downloads []Download - err = gocsv.UnmarshalWithoutHeaders(readCloser, &downloads) - assert.NoError(err) - assert.Equal(len(downloads), 1) - assert.EqualValues(downloads[0].ID, download.ID) - assert.EqualValues(downloads[0].Tag, download.Tag) - assert.EqualValues(downloads[0].Application, download.Application) - assert.EqualValues(downloads[0].State, download.State) - assert.EqualValues(downloads[0].Error, download.Error) - assert.EqualValues(downloads[0].Cost, download.Cost) - assert.EqualValues(downloads[0].Task, download.Task) - assert.EqualValues(downloads[0].Host, download.Host) - assert.EqualValues(downloads[0].CreatedAt, download.CreatedAt) - assert.EqualValues(downloads[0].UpdatedAt, download.UpdatedAt) - }, - }, - { - name: "open storage with downloads of multi files", - baseDir: os.TempDir(), - bufferSize: 1, - download: Download{}, - mock: func(t *testing.T, s Storage, baseDir string, download Download) { - if err := s.CreateDownload(Download{ID: "2"}); err != nil { - t.Fatal(err) - } - - if err := s.CreateDownload(Download{ID: "1"}); err != nil { - t.Fatal(err) - } - - if err := s.CreateDownload(Download{ID: "3"}); err != nil { - t.Fatal(err) - } - }, - expect: func(t *testing.T, s Storage, baseDir string, download Download) { - assert := assert.New(t) - readCloser, err := s.OpenDownload() - assert.NoError(err) - - var downloads []Download - err = gocsv.UnmarshalWithoutHeaders(readCloser, &downloads) - assert.NoError(err) - assert.Equal(len(downloads), 2) - assert.Equal(downloads[0].ID, "2") - assert.Equal(downloads[1].ID, "1") - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(t, s, tc.baseDir, tc.download) - tc.expect(t, s, tc.baseDir, tc.download) - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }) - } -} - -func TestStorage_ClearDownload(t *testing.T) { - tests := []struct { - name string - baseDir string - mock func(s Storage) - expect func(t *testing.T, s Storage, baseDir string) - }{ - { - name: "clear file", - baseDir: os.TempDir(), - mock: func(s Storage) {}, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.NoError(s.ClearDownload()) - fileInfos, err := os.ReadDir(filepath.Join(baseDir)) - assert.NoError(err) - - var backups []os.DirEntry - regexp := regexp.MustCompile(DownloadFilePrefix) - for _, fileInfo := range fileInfos { - if !fileInfo.IsDir() && regexp.MatchString(fileInfo.Name()) { - backups = append(backups, fileInfo) - } - } - assert.Equal(len(backups), 0) - }, - }, - { - name: "open file failed", - baseDir: os.TempDir(), - mock: func(s Storage) { - s.(*storage).baseDir = "baz" - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.Error(s.ClearDownload()) - - s.(*storage).baseDir = baseDir - assert.NoError(s.ClearDownload()) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(s) - tc.expect(t, s, tc.baseDir) - }) - } -} - -func TestStorage_createDownload(t *testing.T) { - tests := []struct { - name string - baseDir string - mock func(s Storage) - expect func(t *testing.T, s Storage, baseDir string) - }{ - { - name: "create download", - baseDir: os.TempDir(), - mock: func(s Storage) {}, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.NoError(s.(*storage).createDownload(Download{})) - }, - }, - { - name: "open file failed", - baseDir: os.TempDir(), - mock: func(s Storage) { - s.(*storage).baseDir = "foo" - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - assert.Error(s.(*storage).createDownload(Download{})) - s.(*storage).baseDir = baseDir - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(s) - tc.expect(t, s, tc.baseDir) - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }) - } -} - -func TestStorage_openDownloadFile(t *testing.T) { - tests := []struct { - name string - baseDir string - maxSize int - maxBackups int - - bufferSize int - mock func(t *testing.T, s Storage) - expect func(t *testing.T, s Storage, baseDir string) - }{ - { - name: "open file failed", - baseDir: os.TempDir(), - maxSize: config.DefaultStorageMaxSize, - maxBackups: config.DefaultStorageMaxBackups, - bufferSize: config.DefaultStorageBufferSize, - mock: func(t *testing.T, s Storage) { - s.(*storage).baseDir = "bat" - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - _, err := s.(*storage).openDownloadFile() - assert.Error(err) - s.(*storage).baseDir = baseDir - }, - }, - { - name: "open new download file", - baseDir: os.TempDir(), - maxSize: 0, - maxBackups: config.DefaultStorageMaxBackups, - bufferSize: 1, - mock: func(t *testing.T, s Storage) { - if err := s.CreateDownload(Download{ID: "1"}); err != nil { - t.Fatal(err) - } - - if err := s.CreateDownload(Download{ID: "2"}); err != nil { - t.Fatal(err) - } - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - file, err := s.(*storage).openDownloadFile() - assert.NoError(err) - assert.Equal(file.Name(), filepath.Join(baseDir, fmt.Sprintf("%s.%s", DownloadFilePrefix, CSVFileExt))) - file.Close() - }, - }, - { - name: "remove download file", - baseDir: os.TempDir(), - maxSize: 0, - maxBackups: 1, - bufferSize: 1, - mock: func(t *testing.T, s Storage) { - if err := s.CreateDownload(Download{ID: "1"}); err != nil { - t.Fatal(err) - } - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - file, err := s.(*storage).openDownloadFile() - assert.NoError(err) - assert.Equal(file.Name(), filepath.Join(baseDir, fmt.Sprintf("%s.%s", DownloadFilePrefix, CSVFileExt))) - file.Close() - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, tc.maxSize, tc.maxBackups, tc.bufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(t, s) - tc.expect(t, s, tc.baseDir) - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }) - } -} - -func TestStorage_downloadBackupFilename(t *testing.T) { - baseDir := os.TempDir() - s, err := New(baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) - if err != nil { - t.Fatal(err) - } - - filename := s.(*storage).downloadBackupFilename() - regexp := regexp.MustCompile(fmt.Sprintf("%s_.*.%s$", DownloadFilePrefix, CSVFileExt)) - assert := assert.New(t) - assert.True(regexp.MatchString(filename)) - - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } -} - -func TestStorage_downloadBackups(t *testing.T) { - tests := []struct { - name string - baseDir string - mock func(t *testing.T, s Storage) - expect func(t *testing.T, s Storage, baseDir string) - }{ - { - name: "open file failed", - baseDir: os.TempDir(), - mock: func(t *testing.T, s Storage) { - s.(*storage).baseDir = "bar" - }, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - _, err := s.(*storage).downloadBackups() - assert.Error(err) - s.(*storage).baseDir = baseDir - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - }, - }, - { - name: "not found download file", - baseDir: os.TempDir(), - mock: func(t *testing.T, s Storage) {}, - expect: func(t *testing.T, s Storage, baseDir string) { - assert := assert.New(t) - if err := s.ClearDownload(); err != nil { - t.Fatal(err) - } - - _, err := s.(*storage).downloadBackups() - assert.Error(err) - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize) - if err != nil { - t.Fatal(err) - } - - tc.mock(t, s) - tc.expect(t, s, tc.baseDir) - }) - } -} diff --git a/scheduler/storage/types.go b/scheduler/storage/types.go deleted file mode 100644 index e9cd2c68b..000000000 --- a/scheduler/storage/types.go +++ /dev/null @@ -1,225 +0,0 @@ -/* - * Copyright 2022 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package storage - -import ( - "time" - - resource "d7y.io/dragonfly/v2/scheduler/resource/standard" -) - -// Task contains content for task. -type Task struct { - // ID is task id. - ID string `csv:"id"` - - // URL is task download url. - URL string `csv:"url"` - - // Type is task type. - Type string `csv:"type"` - - // ContentLength is task total content length. - ContentLength int64 `csv:"contentLength"` - - // TotalPieceCount is total piece count. - TotalPieceCount int32 `csv:"totalPieceCount"` - - // BackToSourceLimit is back-to-source limit. - BackToSourceLimit int32 `csv:"backToSourceLimit"` - - // BackToSourcePeerCount is back-to-source peer count. - BackToSourcePeerCount int32 `csv:"backToSourcePeerCount"` - - // State is the download state of the task. - State string `csv:"state"` - - // CreatedAt is peer create nanosecond time. - CreatedAt int64 `csv:"createdAt"` - - // UpdatedAt is peer update nanosecond time. - UpdatedAt int64 `csv:"updatedAt"` -} - -// Host contains content for host. -type Host struct { - // ID is host id. - ID string `csv:"id"` - - // Type is host type. - Type string `csv:"type"` - - // Hostname is host name. - Hostname string `csv:"hostname"` - - // IP is host ip. - IP string `csv:"ip"` - - // Port is grpc service port. - Port int32 `csv:"port"` - - // DownloadPort is piece downloading port. - DownloadPort int32 `csv:"downloadPort"` - - // Host OS. - OS string `csv:"os"` - - // Host platform. - Platform string `csv:"platform"` - - // Host platform family. - PlatformFamily string `csv:"platformFamily"` - - // Host platform version. - PlatformVersion string `csv:"platformVersion"` - - // Host kernel version. - KernelVersion string `csv:"kernelVersion"` - - // ConcurrentUploadLimit is concurrent upload limit count. - ConcurrentUploadLimit int32 `csv:"concurrentUploadLimit"` - - // ConcurrentUploadCount is concurrent upload count. - ConcurrentUploadCount int32 `csv:"concurrentUploadCount"` - - // UploadCount is total upload count. - UploadCount int64 `csv:"uploadCount"` - - // UploadFailedCount is upload failed count. - UploadFailedCount int64 `csv:"uploadFailedCount"` - - // CPU Stat. - CPU resource.CPU `csv:"cpu"` - - // Memory Stat. - Memory resource.Memory `csv:"memory"` - - // Network Stat. - Network resource.Network `csv:"network"` - - // Disk Stat. - Disk resource.Disk `csv:"disk"` - - // Build information. - Build resource.Build `csv:"build"` - - // SchedulerClusterID is scheduler cluster id. - SchedulerClusterID int64 `csv:"schedulerClusterId"` - - // CreatedAt is peer create nanosecond time. - CreatedAt int64 `csv:"createdAt"` - - // UpdatedAt is peer update nanosecond time. - UpdatedAt int64 `csv:"updatedAt"` -} - -// Piece contains content for piece. -type Piece struct { - // Length is piece length. - Length int64 `csv:"length"` - - // Cost is the cost time for downloading piece. - Cost int64 `csv:"cost"` - - // CreatedAt is piece create time. - CreatedAt int64 `csv:"createdAt"` -} - -// Parent contains content for parent. -type Parent struct { - // ID is peer id. - ID string `csv:"id"` - - // Tag is peer tag. - Tag string `csv:"tag"` - - // Application is peer application. - Application string `csv:"application"` - - // State is the download state of the peer. - State string `csv:"state"` - - // Cost is the task download duration of nanosecond. - Cost int64 `csv:"cost"` - - // UploadPieceCount is upload piece count. - UploadPieceCount int32 `csv:"uploadPieceCount"` - - // FinishedPieceCount is finished piece count. - FinishedPieceCount int32 `csv:"finishedPieceCount"` - - // Host is peer host. - Host Host `csv:"host"` - - // Pieces is downloaded pieces from parent host. - Pieces []Piece `csv:"pieces" csv[]:"10"` - - // CreatedAt is peer create nanosecond time. - CreatedAt int64 `csv:"createdAt"` - - // UpdatedAt is peer update nanosecond time. - UpdatedAt int64 `csv:"updatedAt"` -} - -// Error contains content for error. -type Error struct { - time.Duration - // Code is the code of error. - Code string `csv:"code"` - - // Message is the message of error. - Message string `csv:"message"` -} - -// Download contains content for download. -type Download struct { - // ID is peer id. - ID string `csv:"id"` - - // Tag is peer tag. - Tag string `csv:"tag"` - - // Application is peer application. - Application string `csv:"application"` - - // State is the download state of the peer. - State string `csv:"state"` - - // Error is the details of error. - Error Error `csv:"error"` - - // Cost is the task download duration of nanosecond. - Cost int64 `csv:"cost"` - - // FinishedPieceCount is finished piece count. - FinishedPieceCount int32 `csv:"finishedPieceCount"` - - // Task is peer task. - Task Task `csv:"task"` - - // Host is peer host. - Host Host `csv:"host"` - - // Parents is peer parents. - Parents []Parent `csv:"parents" csv[]:"20"` - - // CreatedAt is peer create nanosecond time. - CreatedAt int64 `csv:"createdAt"` - - // UpdatedAt is peer update nanosecond time. - UpdatedAt int64 `csv:"updatedAt"` -}