feat: add disk bandwidth information for host (#3652)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2024-11-18 20:59:45 +08:00 committed by GitHub
parent 11d6564559
commit b2c8e76e1e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
22 changed files with 216 additions and 1974 deletions

2
go.mod
View File

@ -3,7 +3,7 @@ module d7y.io/dragonfly/v2
go 1.23.0
require (
d7y.io/api/v2 v2.0.169
d7y.io/api/v2 v2.0.171
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0

4
go.sum
View File

@ -53,8 +53,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
d7y.io/api/v2 v2.0.169 h1:CKxPnhXJ0FNOtyATZ5pw5yolRhV6mhlFnEOvgBs9cRA=
d7y.io/api/v2 v2.0.169/go.mod h1:s3ovYyCQQ9RHUC+RMpAZYI075vkaz/PcLpoyTZqvvOg=
d7y.io/api/v2 v2.0.171 h1:iHMAhim/BFJ6MhZzsGMmVqF/h0Atw59g/0GuYFyhGxg=
d7y.io/api/v2 v2.0.171/go.mod h1:HLM5CjwBmy1pDGNUsUNkQeQPItblnHmeTJBEvBDbGnE=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=

View File

@ -26,7 +26,6 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
"d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/storage"
)
// Announcer is the interface used for announce service.
@ -42,7 +41,6 @@ type Announcer interface {
type announcer struct {
config *config.Config
managerClient managerclient.V2
storage storage.Storage
done chan struct{}
}
@ -50,11 +48,10 @@ type announcer struct {
type Option func(s *announcer)
// New returns a new Announcer interface.
func New(cfg *config.Config, managerClient managerclient.V2, storage storage.Storage, schedulerFeatures []string, options ...Option) (Announcer, error) {
func New(cfg *config.Config, managerClient managerclient.V2, schedulerFeatures []string, options ...Option) (Announcer, error) {
a := &announcer{
config: cfg,
managerClient: managerClient,
storage: storage,
done: make(chan struct{}),
}

View File

@ -30,7 +30,6 @@ import (
managertypes "d7y.io/dragonfly/v2/manager/types"
managerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/manager/client/mocks"
"d7y.io/dragonfly/v2/scheduler/config"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
var (
@ -116,10 +115,9 @@ func TestAnnouncer_New(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockManagerClient := managerclientmocks.NewMockV2(ctl)
mockStorage := storagemocks.NewMockStorage(ctl)
tc.mock(mockManagerClient.EXPECT())
a, err := New(tc.config, mockManagerClient, mockStorage, managertypes.DefaultSchedulerFeatures)
a, err := New(tc.config, mockManagerClient, managertypes.DefaultSchedulerFeatures)
tc.expect(t, a, err)
})
}
@ -134,7 +132,7 @@ func TestAnnouncer_Serve(t *testing.T) {
config *config.Config
data []byte
sleep func()
mock func(data []byte, m *managerclientmocks.MockV2MockRecorder, ms *storagemocks.MockStorageMockRecorder)
mock func(data []byte, m *managerclientmocks.MockV2MockRecorder)
except func(t *testing.T, a Announcer)
}{
{
@ -161,7 +159,7 @@ func TestAnnouncer_Serve(t *testing.T) {
sleep: func() {
time.Sleep(3 * time.Second)
},
mock: func(data []byte, m *managerclientmocks.MockV2MockRecorder, ms *storagemocks.MockStorageMockRecorder) {
mock: func(data []byte, m *managerclientmocks.MockV2MockRecorder) {
gomock.InOrder(
m.UpdateScheduler(gomock.Any(), gomock.Eq(&managerv2.UpdateSchedulerRequest{
SourceType: managerv2.SourceType_SCHEDULER_SOURCE,
@ -190,10 +188,9 @@ func TestAnnouncer_Serve(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
mockManagerClient := managerclientmocks.NewMockV2(ctl)
mockStorage := storagemocks.NewMockStorage(ctl)
tc.mock(tc.data, mockManagerClient.EXPECT(), mockStorage.EXPECT())
a, err := New(tc.config, mockManagerClient, mockStorage, managertypes.DefaultSchedulerFeatures)
tc.mock(tc.data, mockManagerClient.EXPECT())
a, err := New(tc.config, mockManagerClient, managertypes.DefaultSchedulerFeatures)
if err != nil {
t.Fatal(err)
}
@ -267,10 +264,9 @@ func TestAnnouncer_announceToManager(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
mockManagerClient := managerclientmocks.NewMockV2(ctl)
mockStorage := storagemocks.NewMockStorage(ctl)
tc.mock(mockManagerClient.EXPECT())
a, err := New(tc.config, mockManagerClient, mockStorage, managertypes.DefaultSchedulerFeatures)
a, err := New(tc.config, mockManagerClient, managertypes.DefaultSchedulerFeatures)
if err != nil {
t.Fatal(err)
}

View File

@ -116,145 +116,151 @@ type Host struct {
// CPU contains content for cpu.
type CPU struct {
// Number of logical cores in the system.
LogicalCount uint32 `csv:"logicalCount"`
LogicalCount uint32
// Number of physical cores in the system.
PhysicalCount uint32 `csv:"physicalCount"`
PhysicalCount uint32
// Percent calculates the percentage of cpu used.
Percent float64 `csv:"percent"`
Percent float64
// Calculates the percentage of cpu used by process.
ProcessPercent float64 `csv:"processPercent"`
ProcessPercent float64
// Times contains the amounts of time the CPU has spent performing different kinds of work.
Times CPUTimes `csv:"times"`
Times CPUTimes
}
// CPUTimes contains content for cpu times.
type CPUTimes struct {
// CPU time of user.
User float64 `csv:"user"`
User float64
// CPU time of system.
System float64 `csv:"system"`
System float64
// CPU time of idle.
Idle float64 `csv:"idle"`
Idle float64
// CPU time of nice.
Nice float64 `csv:"nice"`
Nice float64
// CPU time of iowait.
Iowait float64 `csv:"iowait"`
Iowait float64
// CPU time of irq.
Irq float64 `csv:"irq"`
Irq float64
// CPU time of softirq.
Softirq float64 `csv:"softirq"`
Softirq float64
// CPU time of steal.
Steal float64 `csv:"steal"`
Steal float64
// CPU time of guest.
Guest float64 `csv:"guest"`
Guest float64
// CPU time of guest nice.
GuestNice float64 `csv:"guestNice"`
GuestNice float64
}
// Memory contains content for memory.
type Memory struct {
// Total amount of RAM on this system.
Total uint64 `csv:"total"`
Total uint64
// RAM available for programs to allocate.
Available uint64 `csv:"available"`
Available uint64
// RAM used by programs.
Used uint64 `csv:"used"`
Used uint64
// Percentage of RAM used by programs.
UsedPercent float64 `csv:"usedPercent"`
UsedPercent float64
// Calculates the percentage of memory used by process.
ProcessUsedPercent float64 `csv:"processUsedPercent"`
ProcessUsedPercent float64
// This is the kernel's notion of free memory.
Free uint64 `csv:"free"`
Free uint64
}
// Network contains content for network.
type Network struct {
// Return count of tcp connections opened and status is ESTABLISHED.
TCPConnectionCount uint32 `csv:"tcpConnectionCount"`
TCPConnectionCount uint32
// Return count of upload tcp connections opened and status is ESTABLISHED.
UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"`
UploadTCPConnectionCount uint32
// Location path(area|country|province|city|...).
Location string `csv:"location"`
Location string
// IDC where the peer host is located
IDC string `csv:"idc"`
IDC string
// Download rate of the host, unit is byte/s.
DownloadRate uint64 `csv:"downloadRate"`
DownloadRate uint64
// Download rate limit of the host, unit is byte/s.
DownloadRateLimit uint64 `csv:"downloadRateLimit"`
DownloadRateLimit uint64
// Upload rate of the host, unit is byte/s.
UploadRate uint64 `csv:"uploadRate"`
UploadRate uint64
// Upload rate limit of the host, unit is byte/s.
UploadRateLimit uint64 `csv:"uploadRateLimit"`
UploadRateLimit uint64
}
// Build contains content for build.
type Build struct {
// Git version.
GitVersion string `csv:"gitVersion"`
GitVersion string
// Git commit.
GitCommit string `csv:"gitCommit"`
GitCommit string
// Golang version.
GoVersion string `csv:"goVersion"`
GoVersion string
// Rust version.
RustVersion string `csv:"rustVersion"`
RustVersion string
// Build platform.
Platform string `csv:"platform"`
Platform string
}
// Disk contains content for disk.
type Disk struct {
// Total amount of disk on the data path of dragonfly.
Total uint64 `csv:"total"`
Total uint64
// Free amount of disk on the data path of dragonfly.
Free uint64 `csv:"free"`
Free uint64
// Used amount of disk on the data path of dragonfly.
Used uint64 `csv:"used"`
Used uint64
// Used percent of disk on the data path of dragonfly directory.
UsedPercent float64 `csv:"usedPercent"`
UsedPercent float64
// Total amount of indoes on the data path of dragonfly directory.
InodesTotal uint64 `csv:"inodesTotal"`
InodesTotal uint64
// Used amount of indoes on the data path of dragonfly directory.
InodesUsed uint64 `csv:"inodesUsed"`
InodesUsed uint64
// Free amount of indoes on the data path of dragonfly directory.
InodesFree uint64 `csv:"inodesFree"`
InodesFree uint64
// Used percent of indoes on the data path of dragonfly directory.
InodesUsedPercent float64 `csv:"inodesUsedPercent"`
InodesUsedPercent float64
// Disk write bandwidth, unit is byte/s.
WriteBandwidth uint64
// Disk read bandwidth, unit is byte/s.
ReadBandwidth uint64
}
// New host instance.

View File

@ -380,6 +380,18 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
return nil, false
}
diskWriteBandwidth, err := strconv.ParseUint(rawHost["disk_write_bandwidth"], 10, 64)
if err != nil {
log.Errorf("parsing disk write bandwidth failed: %v", err)
return nil, false
}
diskReadBandwidth, err := strconv.ParseUint(rawHost["disk_read_bandwidth"], 10, 64)
if err != nil {
log.Errorf("parsing disk read bandwidth failed: %v", err)
return nil, false
}
disk := Disk{
Total: diskTotal,
Free: diskFree,
@ -389,6 +401,8 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
InodesUsed: diskInodesUsed,
InodesFree: diskInodesFree,
InodesUsedPercent: diskInodesUsedPercent,
WriteBandwidth: diskWriteBandwidth,
ReadBandwidth: diskReadBandwidth,
}
build := Build{
@ -498,6 +512,8 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error {
"disk_inodes_used", host.Disk.InodesUsed,
"disk_inodes_free", host.Disk.InodesFree,
"disk_inodes_used_percent", host.Disk.InodesUsedPercent,
"disk_write_bandwidth", host.Disk.WriteBandwidth,
"disk_read_bandwidth", host.Disk.ReadBandwidth,
"build_git_version", host.Build.GitVersion,
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,

View File

@ -230,142 +230,148 @@ type Host struct {
// CPU contains content for cpu.
type CPU struct {
// Number of logical cores in the system.
LogicalCount uint32 `csv:"logicalCount"`
LogicalCount uint32
// Number of physical cores in the system.
PhysicalCount uint32 `csv:"physicalCount"`
PhysicalCount uint32
// Percent calculates the percentage of cpu used.
Percent float64 `csv:"percent"`
Percent float64
// Calculates the percentage of cpu used by process.
ProcessPercent float64 `csv:"processPercent"`
ProcessPercent float64
// Times contains the amounts of time the CPU has spent performing different kinds of work.
Times CPUTimes `csv:"times"`
Times CPUTimes
}
// CPUTimes contains content for cpu times.
type CPUTimes struct {
// CPU time of user.
User float64 `csv:"user"`
User float64
// CPU time of system.
System float64 `csv:"system"`
System float64
// CPU time of idle.
Idle float64 `csv:"idle"`
Idle float64
// CPU time of nice.
Nice float64 `csv:"nice"`
Nice float64
// CPU time of iowait.
Iowait float64 `csv:"iowait"`
Iowait float64
// CPU time of irq.
Irq float64 `csv:"irq"`
Irq float64
// CPU time of softirq.
Softirq float64 `csv:"softirq"`
Softirq float64
// CPU time of steal.
Steal float64 `csv:"steal"`
Steal float64
// CPU time of guest.
Guest float64 `csv:"guest"`
Guest float64
// CPU time of guest nice.
GuestNice float64 `csv:"guestNice"`
GuestNice float64
}
// Memory contains content for memory.
type Memory struct {
// Total amount of RAM on this system.
Total uint64 `csv:"total"`
Total uint64
// RAM available for programs to allocate.
Available uint64 `csv:"available"`
Available uint64
// RAM used by programs.
Used uint64 `csv:"used"`
Used uint64
// Percentage of RAM used by programs.
UsedPercent float64 `csv:"usedPercent"`
UsedPercent float64
// Calculates the percentage of memory used by process.
ProcessUsedPercent float64 `csv:"processUsedPercent"`
ProcessUsedPercent float64
// This is the kernel's notion of free memory.
Free uint64 `csv:"free"`
Free uint64
}
// Network contains content for network.
type Network struct {
// Return count of tcp connections opened and status is ESTABLISHED.
TCPConnectionCount uint32 `csv:"tcpConnectionCount"`
TCPConnectionCount uint32
// Return count of upload tcp connections opened and status is ESTABLISHED.
UploadTCPConnectionCount uint32 `csv:"uploadTCPConnectionCount"`
UploadTCPConnectionCount uint32
// Location path(area|country|province|city|...).
Location string `csv:"location"`
Location string
// IDC where the peer host is located
IDC string `csv:"idc"`
IDC string
// Download rate of the host, unit is byte/s.
DownloadRate uint64 `csv:"downloadRate"`
DownloadRate uint64
// Download rate limit of the host, unit is byte/s.
DownloadRateLimit uint64 `csv:"downloadRateLimit"`
DownloadRateLimit uint64
// Upload rate of the host, unit is byte/s.
UploadRate uint64 `csv:"uploadRate"`
UploadRate uint64
// Upload rate limit of the host, unit is byte/s.
UploadRateLimit uint64 `csv:"uploadRateLimit"`
UploadRateLimit uint64
}
// Build contains content for build.
type Build struct {
// Git version.
GitVersion string `csv:"gitVersion"`
GitVersion string
// Git commit.
GitCommit string `csv:"gitCommit"`
GitCommit string
// Golang version.
GoVersion string `csv:"goVersion"`
GoVersion string
// Build platform.
Platform string `csv:"platform"`
Platform string
}
// Disk contains content for disk.
type Disk struct {
// Total amount of disk on the data path of dragonfly.
Total uint64 `csv:"total"`
Total uint64
// Free amount of disk on the data path of dragonfly.
Free uint64 `csv:"free"`
Free uint64
// Used amount of disk on the data path of dragonfly.
Used uint64 `csv:"used"`
Used uint64
// Used percent of disk on the data path of dragonfly directory.
UsedPercent float64 `csv:"usedPercent"`
UsedPercent float64
// Total amount of indoes on the data path of dragonfly directory.
InodesTotal uint64 `csv:"inodesTotal"`
InodesTotal uint64
// Used amount of indoes on the data path of dragonfly directory.
InodesUsed uint64 `csv:"inodesUsed"`
InodesUsed uint64
// Free amount of indoes on the data path of dragonfly directory.
InodesFree uint64 `csv:"inodesFree"`
InodesFree uint64
// Used percent of indoes on the data path of dragonfly directory.
InodesUsedPercent float64 `csv:"inodesUsedPercent"`
InodesUsedPercent float64
// Disk write bandwidth, unit is byte/s.
WriteBandwidth uint64
// Disk read bandwidth, unit is byte/s.
ReadBandwidth uint64
}
// New host instance.

View File

@ -24,7 +24,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
)
// New returns a new scheduler server from the given options.
@ -34,11 +33,10 @@ func New(
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
opts ...grpc.ServerOption,
) *grpc.Server {
return server.New(
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage),
newSchedulerServerV1(cfg, resource, scheduling, dynconfig),
newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig),
opts...)
}

View File

@ -29,7 +29,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
var (
@ -63,9 +62,8 @@ func TestRPCServer_New(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.expect(t, svr)
})
}

View File

@ -31,7 +31,6 @@ import (
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
"d7y.io/dragonfly/v2/scheduler/storage"
)
// schedulerServerV1 is v1 version of the scheduler grpc server.
@ -46,9 +45,8 @@ func newSchedulerServerV1(
resource resource.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) schedulerv1.SchedulerServer {
return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig, storage)}
return &schedulerServerV1{service.NewV1(cfg, resource, scheduling, dynconfig)}
}
// RegisterPeerTask registers peer and triggers seed peer download task.

View File

@ -30,7 +30,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service"
"d7y.io/dragonfly/v2/scheduler/storage"
)
// schedulerServerV2 is v2 version of the scheduler grpc server.
@ -46,9 +45,8 @@ func newSchedulerServerV2(
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage)}
return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig)}
}
// AnnouncePeer announces peer to scheduler.

View File

@ -46,7 +46,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/rpcserver"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
)
const (
@ -81,9 +80,6 @@ type Server struct {
// Async job.
job job.Job
// Storage interface.
storage storage.Storage
// Announcer interface.
announcer announcer.Announcer
@ -95,18 +91,6 @@ type Server struct {
func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, error) {
s := &Server{config: cfg}
// Initialize Storage.
storage, err := storage.New(
d.DataDir(),
cfg.Storage.MaxSize,
cfg.Storage.MaxBackups,
cfg.Storage.BufferSize,
)
if err != nil {
return nil, err
}
s.storage = storage
// Initialize dial options of manager grpc client.
managerDialOptions := []grpc.DialOption{grpc.WithStatsHandler(otelgrpc.NewClientHandler())}
if cfg.Manager.TLS != nil {
@ -149,7 +133,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
if cfg.Job.Enable && rdb != nil {
schedulerFeatures = append(schedulerFeatures, managertypes.SchedulerFeaturePreheat)
}
announcer, err := announcer.New(cfg, s.managerClient, storage, schedulerFeatures)
announcer, err := announcer.New(cfg, s.managerClient, schedulerFeatures)
if err != nil {
return nil, err
}
@ -226,7 +210,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(rpc.NewInsecureCredentials()))
}
svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, schedulerServerOptions...)
s.grpcServer = svr
// Initialize metrics.
@ -316,13 +300,6 @@ func (s *Server) Stop() {
logger.Info("stop resource closed")
}
// Clean download storage.
if err := s.storage.ClearDownload(); err != nil {
logger.Errorf("clean download storage failed %s", err.Error())
} else {
logger.Info("clean download storage completed")
}
// Stop GC.
s.gc.Stop()
logger.Info("gc closed")

View File

@ -123,6 +123,8 @@ var (
InodesUsed: 7835772,
InodesFree: 4874617108,
InodesUsedPercent: 0.1604884305611568,
WriteBandwidth: 1,
ReadBandwidth: 1,
}
mockBuild = resource.Build{

View File

@ -682,6 +682,8 @@ func ConstructSuccessNormalTaskResponse(candidateParents []*resource.Peer) *sche
InodesUsed: candidateParent.Host.Disk.InodesUsed,
InodesFree: candidateParent.Host.Disk.InodesFree,
InodesUsedPercent: candidateParent.Host.Disk.InodesUsedPercent,
WriteBandwidth: candidateParent.Host.Disk.WriteBandwidth,
ReadBandwidth: candidateParent.Host.Disk.ReadBandwidth,
},
Build: &commonv2.Build{
GitVersion: candidateParent.Host.Build.GitVersion,

View File

@ -46,7 +46,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/metrics"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
)
// V1 is the interface for v1 version of the service.
@ -62,9 +61,6 @@ type V1 struct {
// Dynamic config.
dynconfig config.DynconfigInterface
// Storage interface.
storage storage.Storage
}
// New v1 version of service instance.
@ -73,14 +69,12 @@ func NewV1(
resource resource.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) *V1 {
return &V1{
resource: resource,
scheduling: scheduling,
config: cfg,
dynconfig: dynconfig,
storage: storage,
}
}
@ -300,7 +294,6 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult)
metrics.DownloadPeerCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Host.Type.Name()).Inc()
parents := peer.Parents()
if !req.GetSuccess() {
peer.Log.Error("report failed peer")
if peer.FSM.Is(resource.PeerStateBackToSource) {
@ -308,7 +301,6 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult)
metrics.DownloadPeerBackToSourceFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Host.Type.Name()).Inc()
go v.createDownloadRecord(peer, parents, req)
v.handleTaskFailure(ctx, peer.Task, req.GetSourceError(), nil)
v.handlePeerFailure(ctx, peer)
return nil
@ -318,21 +310,18 @@ func (v *V1) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult)
metrics.DownloadPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Host.Type.Name()).Inc()
go v.createDownloadRecord(peer, parents, req)
v.handlePeerFailure(ctx, peer)
return nil
}
peer.Log.Info("report success peer")
if peer.FSM.Is(resource.PeerStateBackToSource) {
go v.createDownloadRecord(peer, parents, req)
v.handleTaskSuccess(ctx, peer.Task, req)
v.handlePeerSuccess(ctx, peer)
metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(req.GetCost()))
return nil
}
go v.createDownloadRecord(peer, parents, req)
v.handlePeerSuccess(ctx, peer)
metrics.DownloadPeerDuration.WithLabelValues(metrics.CalculateSizeLevel(peer.Task.ContentLength.Load()).String()).Observe(float64(req.GetCost()))
return nil
@ -1338,220 +1327,3 @@ func (v *V1) handleTaskFailure(ctx context.Context, task *resource.Task, backToS
return
}
}
// createDownloadRecord stores peer download records.
func (v *V1) createDownloadRecord(peer *resource.Peer, parents []*resource.Peer, req *schedulerv1.PeerResult) {
var parentRecords []storage.Parent
for _, parent := range parents {
parentRecord := storage.Parent{
ID: parent.ID,
Tag: parent.Task.Tag,
Application: parent.Task.Application,
State: parent.FSM.Current(),
Cost: parent.Cost.Load().Nanoseconds(),
UploadPieceCount: 0,
FinishedPieceCount: int32(parent.FinishedPieces.Count()),
CreatedAt: parent.CreatedAt.Load().UnixNano(),
UpdatedAt: parent.UpdatedAt.Load().UnixNano(),
Host: storage.Host{
ID: parent.Host.ID,
Type: parent.Host.Type.Name(),
Hostname: parent.Host.Hostname,
IP: parent.Host.IP,
Port: parent.Host.Port,
DownloadPort: parent.Host.DownloadPort,
OS: parent.Host.OS,
Platform: parent.Host.Platform,
PlatformFamily: parent.Host.PlatformFamily,
PlatformVersion: parent.Host.PlatformVersion,
KernelVersion: parent.Host.KernelVersion,
ConcurrentUploadLimit: parent.Host.ConcurrentUploadLimit.Load(),
ConcurrentUploadCount: parent.Host.ConcurrentUploadCount.Load(),
UploadCount: parent.Host.UploadCount.Load(),
UploadFailedCount: parent.Host.UploadFailedCount.Load(),
CreatedAt: parent.Host.CreatedAt.Load().UnixNano(),
UpdatedAt: parent.Host.UpdatedAt.Load().UnixNano(),
},
}
parentRecord.Host.CPU = resource.CPU{
LogicalCount: parent.Host.CPU.LogicalCount,
PhysicalCount: parent.Host.CPU.PhysicalCount,
Percent: parent.Host.CPU.Percent,
ProcessPercent: parent.Host.CPU.ProcessPercent,
Times: resource.CPUTimes{
User: parent.Host.CPU.Times.User,
System: parent.Host.CPU.Times.System,
Idle: parent.Host.CPU.Times.Idle,
Nice: parent.Host.CPU.Times.Nice,
Iowait: parent.Host.CPU.Times.Iowait,
Irq: parent.Host.CPU.Times.Irq,
Softirq: parent.Host.CPU.Times.Softirq,
Steal: parent.Host.CPU.Times.Steal,
Guest: parent.Host.CPU.Times.Guest,
GuestNice: parent.Host.CPU.Times.GuestNice,
},
}
parentRecord.Host.Memory = resource.Memory{
Total: parent.Host.Memory.Total,
Available: parent.Host.Memory.Available,
Used: parent.Host.Memory.Used,
UsedPercent: parent.Host.Memory.UsedPercent,
ProcessUsedPercent: parent.Host.Memory.ProcessUsedPercent,
Free: parent.Host.Memory.Free,
}
parentRecord.Host.Network = resource.Network{
TCPConnectionCount: parent.Host.Network.TCPConnectionCount,
UploadTCPConnectionCount: parent.Host.Network.UploadTCPConnectionCount,
Location: parent.Host.Network.Location,
IDC: parent.Host.Network.IDC,
}
parentRecord.Host.Disk = resource.Disk{
Total: parent.Host.Disk.Total,
Free: parent.Host.Disk.Free,
Used: parent.Host.Disk.Used,
UsedPercent: parent.Host.Disk.UsedPercent,
InodesTotal: parent.Host.Disk.InodesTotal,
InodesUsed: parent.Host.Disk.InodesUsed,
InodesFree: parent.Host.Disk.InodesFree,
InodesUsedPercent: parent.Host.Disk.InodesUsedPercent,
}
parentRecord.Host.Build = resource.Build{
GitVersion: parent.Host.Build.GitVersion,
GitCommit: parent.Host.Build.GitCommit,
GoVersion: parent.Host.Build.GoVersion,
Platform: parent.Host.Build.Platform,
}
peer.Pieces.Range(func(key, value any) bool {
piece, ok := value.(*resource.Piece)
if !ok {
return true
}
if piece.ParentID == parent.ID {
parentRecord.UploadPieceCount++
parentRecord.Pieces = append(parentRecord.Pieces, storage.Piece{
Length: int64(piece.Length),
Cost: piece.Cost.Nanoseconds(),
CreatedAt: piece.CreatedAt.UnixNano(),
})
}
return true
})
parentRecords = append(parentRecords, parentRecord)
}
download := storage.Download{
ID: peer.ID,
Tag: peer.Task.Tag,
Application: peer.Task.Application,
State: peer.FSM.Current(),
Cost: peer.Cost.Load().Nanoseconds(),
FinishedPieceCount: int32(peer.FinishedPieces.Count()),
Parents: parentRecords,
CreatedAt: peer.CreatedAt.Load().UnixNano(),
UpdatedAt: peer.UpdatedAt.Load().UnixNano(),
Task: storage.Task{
ID: peer.Task.ID,
URL: peer.Task.URL,
Type: peer.Task.Type.String(),
ContentLength: peer.Task.ContentLength.Load(),
TotalPieceCount: peer.Task.TotalPieceCount.Load(),
BackToSourceLimit: peer.Task.BackToSourceLimit.Load(),
BackToSourcePeerCount: int32(peer.Task.BackToSourcePeers.Len()),
State: peer.Task.FSM.Current(),
CreatedAt: peer.Task.CreatedAt.Load().UnixNano(),
UpdatedAt: peer.Task.UpdatedAt.Load().UnixNano(),
},
Host: storage.Host{
ID: peer.Host.ID,
Type: peer.Host.Type.Name(),
Hostname: peer.Host.Hostname,
IP: peer.Host.IP,
Port: peer.Host.Port,
DownloadPort: peer.Host.DownloadPort,
OS: peer.Host.OS,
Platform: peer.Host.Platform,
PlatformFamily: peer.Host.PlatformFamily,
PlatformVersion: peer.Host.PlatformVersion,
KernelVersion: peer.Host.KernelVersion,
ConcurrentUploadLimit: peer.Host.ConcurrentUploadLimit.Load(),
ConcurrentUploadCount: peer.Host.ConcurrentUploadCount.Load(),
UploadCount: peer.Host.UploadCount.Load(),
UploadFailedCount: peer.Host.UploadFailedCount.Load(),
SchedulerClusterID: int64(peer.Host.SchedulerClusterID),
CreatedAt: peer.Host.CreatedAt.Load().UnixNano(),
UpdatedAt: peer.Host.UpdatedAt.Load().UnixNano(),
},
}
download.Host.CPU = resource.CPU{
LogicalCount: peer.Host.CPU.LogicalCount,
PhysicalCount: peer.Host.CPU.PhysicalCount,
Percent: peer.Host.CPU.Percent,
ProcessPercent: peer.Host.CPU.ProcessPercent,
Times: resource.CPUTimes{
User: peer.Host.CPU.Times.User,
System: peer.Host.CPU.Times.System,
Idle: peer.Host.CPU.Times.Idle,
Nice: peer.Host.CPU.Times.Nice,
Iowait: peer.Host.CPU.Times.Iowait,
Irq: peer.Host.CPU.Times.Irq,
Softirq: peer.Host.CPU.Times.Softirq,
Steal: peer.Host.CPU.Times.Steal,
Guest: peer.Host.CPU.Times.Guest,
GuestNice: peer.Host.CPU.Times.GuestNice,
},
}
download.Host.Memory = resource.Memory{
Total: peer.Host.Memory.Total,
Available: peer.Host.Memory.Available,
Used: peer.Host.Memory.Used,
UsedPercent: peer.Host.Memory.UsedPercent,
ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
Free: peer.Host.Memory.Free,
}
download.Host.Network = resource.Network{
TCPConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTCPConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
Location: peer.Host.Network.Location,
IDC: peer.Host.Network.IDC,
}
download.Host.Disk = resource.Disk{
Total: peer.Host.Disk.Total,
Free: peer.Host.Disk.Free,
Used: peer.Host.Disk.Used,
UsedPercent: peer.Host.Disk.UsedPercent,
InodesTotal: peer.Host.Disk.InodesTotal,
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
}
download.Host.Build = resource.Build{
GitVersion: peer.Host.Build.GitVersion,
GitCommit: peer.Host.Build.GitCommit,
GoVersion: peer.Host.Build.GoVersion,
Platform: peer.Host.Build.Platform,
}
if req.GetCode() != commonv1.Code_Success {
download.Error = storage.Error{
Code: req.GetCode().String(),
}
}
if err := v.storage.CreateDownload(download); err != nil {
peer.Log.Error(err)
}
}

View File

@ -57,8 +57,6 @@ import (
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
"d7y.io/dragonfly/v2/scheduler/storage"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
var (
@ -138,9 +136,8 @@ func TestService_NewV1(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
resource := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig, storage))
tc.expect(t, NewV1(&config.Config{Scheduler: mockSchedulerConfig}, resource, scheduling, dynconfig))
})
}
}
@ -801,11 +798,10 @@ func TestServiceV1_RegisterPeerTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -1066,10 +1062,9 @@ func TestServiceV1_ReportPieceResult(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
stream := schedulerv1mocks.NewMockScheduler_ReportPieceResultServer(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -1087,8 +1082,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
name string
req *schedulerv1.PeerResult
run func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder)
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder)
}{
{
name: "peer not found",
@ -1096,8 +1090,7 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
PeerId: mockPeerID,
},
run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder) {
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(nil, false).Times(1),
@ -1117,18 +1110,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
PeerId: mockPeerID,
},
run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
mockPeer.FSM.SetState(resource.PeerStateFailed)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1),
md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1),
)
assert := assert.New(t)
@ -1143,18 +1130,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
PeerId: mockPeerID,
},
run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
mockPeer.FSM.SetState(resource.PeerStateBackToSource)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1),
md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1),
)
assert := assert.New(t)
@ -1169,18 +1150,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
PeerId: mockPeerID,
},
run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
mockPeer.FSM.SetState(resource.PeerStateFailed)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1),
md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1),
)
assert := assert.New(t)
@ -1195,18 +1170,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
PeerId: mockPeerID,
},
run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
mockPeer.FSM.SetState(resource.PeerStateBackToSource)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1),
md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1),
)
assert := assert.New(t)
@ -1221,18 +1190,12 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
PeerId: mockPeerID,
},
run: func(t *testing.T, peer *resource.Peer, req *schedulerv1.PeerResult, svc *V1, mockPeer *resource.Peer, res resource.Resource, peerManager resource.PeerManager,
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, ms *storagemocks.MockStorageMockRecorder,
md *configmocks.MockDynconfigInterfaceMockRecorder) {
var wg sync.WaitGroup
wg.Add(1)
defer wg.Wait()
mr *resource.MockResourceMockRecorder, mp *resource.MockPeerManagerMockRecorder, md *configmocks.MockDynconfigInterfaceMockRecorder) {
mockPeer.FSM.SetState(resource.PeerStateBackToSource)
gomock.InOrder(
mr.PeerManager().Return(peerManager).Times(1),
mp.Load(gomock.Eq(mockPeerID)).Return(mockPeer, true).Times(1),
md.GetApplications().Return([]*managerv2.Application{}, nil).Times(1),
ms.CreateDownload(gomock.Any()).Do(func(download storage.Download) { wg.Done() }).Return(nil).Times(1),
)
assert := assert.New(t)
@ -1249,16 +1212,15 @@ func TestServiceV1_ReportPeerResult(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockPeerID, mockTask, mockHost)
tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), storage.EXPECT(), dynconfig.EXPECT())
tc.run(t, mockPeer, tc.req, svc, mockPeer, res, peerManager, res.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
}
}
@ -1313,9 +1275,8 @@ func TestServiceV1_StatTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
taskManager := resource.NewMockTaskManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
tc.mock(mockTask, taskManager, res.EXPECT(), taskManager.EXPECT())
@ -1607,11 +1568,10 @@ func TestServiceV1_AnnounceTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
taskManager := resource.NewMockTaskManager(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -1807,14 +1767,13 @@ func TestServiceV1_LeaveTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
tc.mock(peer, peerManager, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT())
tc.expect(t, peer, svc.LeaveTask(context.Background(), &schedulerv1.PeerTarget{}))
@ -2237,12 +2196,11 @@ func TestServiceV1_AnnounceHost(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
tc.run(t, svc, tc.req, host, hostManager, res.EXPECT(), hostManager.EXPECT(), dynconfig.EXPECT())
})
@ -2449,14 +2407,13 @@ func TestServiceV1_LeaveHost(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := resource.NewMockHostManager(ctl)
host := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
mockPeer := resource.NewPeer(mockSeedPeerID, mockTask, host)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
tc.mock(host, mockPeer, hostManager, scheduling.EXPECT(), res.EXPECT(), hostManager.EXPECT())
tc.expect(t, mockPeer, svc.LeaveHost(context.Background(), &schedulerv1.LeaveHostRequest{
@ -2554,14 +2511,13 @@ func TestServiceV1_prefetchTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
seedPeer := resource.NewMockSeedPeer(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, task, mockHost)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage)
svc := NewV1(tc.config, res, scheduling, dynconfig)
taskManager := resource.NewMockTaskManager(ctl)
tc.mock(task, peer, taskManager, seedPeer, res.EXPECT(), taskManager.EXPECT(), seedPeer.EXPECT())
@ -3020,8 +2976,7 @@ func TestServiceV1_triggerTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage)
svc := NewV1(tc.config, res, scheduling, dynconfig)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -3124,8 +3079,7 @@ func TestServiceV1_storeTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
taskManager := resource.NewMockTaskManager(ctl)
tc.run(t, svc, taskManager, res.EXPECT(), taskManager.EXPECT())
})
@ -3202,8 +3156,7 @@ func TestServiceV1_storeHost(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
hostManager := resource.NewMockHostManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -3286,8 +3239,7 @@ func TestServiceV1_storePeer(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
peerManager := resource.NewMockPeerManager(ctl)
tc.run(t, svc, peerManager, res.EXPECT(), peerManager.EXPECT())
@ -3345,14 +3297,13 @@ func TestServiceV1_triggerSeedPeerTask(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
seedPeer := resource.NewMockSeedPeer(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, task, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, SeedPeer: mockSeedPeerConfig}, res, scheduling, dynconfig)
tc.mock(task, peer, seedPeer, res.EXPECT(), seedPeer.EXPECT())
svc.triggerSeedPeerTask(context.Background(), &mockPeerRange, task)
@ -3427,13 +3378,12 @@ func TestServiceV1_handleBeginOfPiece(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig)
tc.mock(peer, scheduling.EXPECT())
svc.handleBeginOfPiece(context.Background(), peer)
@ -3566,9 +3516,8 @@ func TestServiceV1_handlePieceSuccess(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
tc.mock(tc.peer, peerManager, res.EXPECT(), peerManager.EXPECT())
svc.handlePieceSuccess(context.Background(), tc.peer, tc.piece)
@ -3755,7 +3704,6 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := resource.NewMockPeerManager(ctl)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
@ -3764,7 +3712,7 @@ func TestServiceV1_handlePieceFail(t *testing.T) {
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
parent := resource.NewPeer(mockSeedPeerID, mockTask, mockHost)
seedPeer := resource.NewMockSeedPeer(ctl)
svc := NewV1(tc.config, res, scheduling, dynconfig, storage)
svc := NewV1(tc.config, res, scheduling, dynconfig)
tc.run(t, svc, peer, parent, tc.piece, peerManager, seedPeer, scheduling.EXPECT(), res.EXPECT(), peerManager.EXPECT(), seedPeer.EXPECT())
})
@ -3863,7 +3811,6 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
url, err := url.Parse(s.URL)
if err != nil {
@ -3887,7 +3834,7 @@ func TestServiceV1_handlePeerSuccess(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
peer := resource.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
tc.mock(peer)
svc.handlePeerSuccess(context.Background(), peer)
@ -3961,8 +3908,7 @@ func TestServiceV1_handlePeerFail(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
mockHost := resource.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
@ -4047,8 +3993,7 @@ func TestServiceV1_handleTaskSuccess(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
tc.mock(task)
@ -4186,8 +4131,7 @@ func TestServiceV1_handleTaskFail(t *testing.T) {
scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig, storage)
svc := NewV1(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, res, scheduling, dynconfig)
task := resource.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, resource.WithDigest(mockTaskDigest), resource.WithPieceLength(mockTaskPieceLength))
tc.mock(task)

View File

@ -43,7 +43,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage"
)
// V2 is the interface for v2 version of the service.
@ -62,9 +61,6 @@ type V2 struct {
// Dynamic config.
dynconfig config.DynconfigInterface
// Storage interface.
storage storage.Storage
}
// New v2 version of service instance.
@ -74,7 +70,6 @@ func NewV2(
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface,
storage storage.Storage,
) *V2 {
return &V2{
resource: resource,
@ -82,7 +77,6 @@ func NewV2(
scheduling: scheduling,
config: cfg,
dynconfig: dynconfig,
storage: storage,
}
}
@ -367,6 +361,8 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
WriteBandwidth: peer.Host.Disk.WriteBandwidth,
ReadBandwidth: peer.Host.Disk.ReadBandwidth,
},
Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion,
@ -586,6 +582,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
InodesUsed: req.Host.Disk.GetInodesUsed(),
InodesFree: req.Host.Disk.GetInodesFree(),
InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
WriteBandwidth: req.Host.Disk.GetWriteBandwidth(),
ReadBandwidth: req.Host.Disk.GetReadBandwidth(),
}))
}
@ -686,6 +684,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
InodesUsed: req.Host.Disk.GetInodesUsed(),
InodesFree: req.Host.Disk.GetInodesFree(),
InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
WriteBandwidth: req.Host.Disk.GetWriteBandwidth(),
ReadBandwidth: req.Host.Disk.GetReadBandwidth(),
}
}
@ -759,6 +759,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
InodesUsed: req.Host.Disk.GetInodesUsed(),
InodesFree: req.Host.Disk.GetInodesFree(),
InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
WriteBandwidth: req.Host.Disk.GetWriteBandwidth(),
ReadBandwidth: req.Host.Disk.GetReadBandwidth(),
},
persistentcache.Build{
GitVersion: req.Host.Build.GetGitVersion(),
@ -850,6 +852,8 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
InodesUsed: req.Host.Disk.GetInodesUsed(),
InodesFree: req.Host.Disk.GetInodesFree(),
InodesUsedPercent: req.Host.Disk.GetInodesUsedPercent(),
WriteBandwidth: req.Host.Disk.GetWriteBandwidth(),
ReadBandwidth: req.Host.Disk.GetReadBandwidth(),
}
}
@ -940,6 +944,8 @@ func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, err
InodesUsed: host.Disk.InodesUsed,
InodesFree: host.Disk.InodesFree,
InodesUsedPercent: host.Disk.InodesUsedPercent,
WriteBandwidth: host.Disk.WriteBandwidth,
ReadBandwidth: host.Disk.ReadBandwidth,
},
Build: &commonv2.Build{
GitVersion: host.Build.GitVersion,
@ -1670,6 +1676,8 @@ func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatP
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
WriteBandwidth: peer.Host.Disk.WriteBandwidth,
ReadBandwidth: peer.Host.Disk.ReadBandwidth,
},
Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion,

View File

@ -51,7 +51,6 @@ import (
"d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
schedulingmocks "d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
)
var (
@ -251,9 +250,8 @@ func TestService_NewV2(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage))
tc.expect(t, NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig))
})
}
}
@ -423,14 +421,14 @@ func TestServiceV2_StatPeer(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig)
tc.mock(peer, peerManager, resource.EXPECT(), peerManager.EXPECT())
resp, err := svc.StatPeer(context.Background(), &schedulerv2.StatPeerRequest{TaskId: mockTaskID, PeerId: mockPeerID})
@ -495,14 +493,13 @@ func TestServiceV2_DeletePeer(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost, standard.WithRange(mockPeerRange))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig)
tc.mock(peer, peerManager, resource.EXPECT(), peerManager.EXPECT())
tc.expect(t, svc.DeletePeer(context.Background(), &schedulerv2.DeletePeerRequest{TaskId: mockTaskID, PeerId: mockPeerID}))
@ -584,10 +581,10 @@ func TestServiceV2_StatTask(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
taskManager := standard.NewMockTaskManager(ctl)
task := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig)
tc.mock(task, taskManager, resource.EXPECT(), taskManager.EXPECT())
resp, err := svc.StatTask(context.Background(), &schedulerv2.StatTaskRequest{TaskId: mockTaskID})
@ -1426,7 +1423,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := standard.NewMockHostManager(ctl)
persistentcacheHostManager := persistentcache.NewMockHostManager(ctl)
host := standard.NewHost(
@ -1440,7 +1437,7 @@ func TestServiceV2_AnnounceHost(t *testing.T) {
mockRawPersistentCacheHost.CPU, mockRawPersistentCacheHost.Memory, mockRawPersistentCacheHost.Network, mockRawPersistentCacheHost.Disk,
mockRawPersistentCacheHost.Build, mockRawPersistentCacheHost.AnnounceInterval, mockRawPersistentCacheHost.CreatedAt, mockRawPersistentCacheHost.UpdatedAt, mockRawHost.Log)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, host, persistentCacheHost, hostManager, persistentcacheHostManager, resource.EXPECT(), persistentCacheResource.EXPECT(), hostManager.EXPECT(), persistentcacheHostManager.EXPECT(), dynconfig.EXPECT())
})
@ -1555,12 +1552,12 @@ func TestServiceV2_ListHosts(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := standard.NewMockHostManager(ctl)
host := standard.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname, mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type,
standard.WithCPU(mockCPU), standard.WithMemory(mockMemory), standard.WithNetwork(mockNetwork), standard.WithDisk(mockDisk), standard.WithBuild(mockBuild))
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig)
tc.mock(host, hostManager, resource.EXPECT(), hostManager.EXPECT())
resp, err := svc.ListHosts(context.Background())
@ -1627,14 +1624,14 @@ func TestServiceV2_DeleteHost(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := standard.NewMockHostManager(ctl)
host := standard.NewHost(
mockRawHost.ID, mockRawHost.IP, mockRawHost.Hostname,
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
mockPeer := standard.NewPeer(mockSeedPeerID, mockTask, host)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig, Metrics: config.MetricsConfig{EnableHost: true}}, resource, persistentCacheResource, scheduling, dynconfig)
tc.mock(host, mockPeer, hostManager, resource.EXPECT(), hostManager.EXPECT())
tc.expect(t, mockPeer, svc.DeleteHost(context.Background(), &schedulerv2.DeleteHostRequest{HostId: mockHostID}))
@ -1925,7 +1922,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := standard.NewMockHostManager(ctl)
peerManager := standard.NewMockPeerManager(ctl)
taskManager := standard.NewMockTaskManager(ctl)
@ -1937,7 +1934,7 @@ func TestServiceV2_handleRegisterPeerRequest(t *testing.T) {
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := standard.NewPeer(mockSeedPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, peer, seedPeer, hostManager, taskManager, peerManager, stream, resource.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT(), stream.EXPECT(), scheduling.EXPECT())
})
@ -2022,7 +2019,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2030,7 +2027,7 @@ func TestServiceV2_handleDownloadPeerStartedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2115,7 +2112,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2123,7 +2120,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceStartedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2187,7 +2184,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2195,7 +2192,7 @@ func TestServiceV2_handleRescheduleRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), scheduling.EXPECT())
})
@ -2261,7 +2258,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2269,7 +2266,7 @@ func TestServiceV2_handleDownloadPeerFinishedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2443,7 +2440,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
url, err := url.Parse(s.URL)
@ -2469,7 +2466,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFinishedRequest(t *testing.T) {
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2534,7 +2531,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2542,7 +2539,7 @@ func TestServiceV2_handleDownloadPeerFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2654,7 +2651,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2662,7 +2659,7 @@ func TestServiceV2_handleDownloadPeerBackToSourceFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, peerManager, resource.EXPECT(), peerManager.EXPECT(), dynconfig.EXPECT())
})
@ -2813,7 +2810,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2821,7 +2818,7 @@ func TestServiceV2_handleDownloadPieceFinishedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT())
})
@ -2939,7 +2936,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T)
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -2947,7 +2944,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFinishedRequest(t *testing.T)
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT())
})
@ -3050,7 +3047,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -3058,7 +3055,7 @@ func TestServiceV2_handleDownloadPieceFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT())
})
@ -3116,7 +3113,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
peerManager := standard.NewMockPeerManager(ctl)
mockHost := standard.NewHost(
@ -3124,7 +3121,7 @@ func TestServiceV2_handleDownloadPieceBackToSourceFailedRequest(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.req, peer, peerManager, resource.EXPECT(), peerManager.EXPECT())
})
@ -3327,7 +3324,7 @@ func TestServiceV2_handleResource(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
hostManager := standard.NewMockHostManager(ctl)
taskManager := standard.NewMockTaskManager(ctl)
peerManager := standard.NewMockPeerManager(ctl)
@ -3338,7 +3335,7 @@ func TestServiceV2_handleResource(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
mockPeer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, tc.download, stream, mockHost, mockTask, mockPeer, hostManager, taskManager, peerManager, resource.EXPECT(), hostManager.EXPECT(), taskManager.EXPECT(), peerManager.EXPECT())
})
@ -3607,7 +3604,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl)
seedPeerClient := standard.NewMockSeedPeer(ctl)
mockHost := standard.NewHost(
@ -3615,7 +3612,7 @@ func TestServiceV2_downloadTaskBySeedPeer(t *testing.T) {
mockRawHost.Port, mockRawHost.DownloadPort, mockRawHost.Type)
mockTask := standard.NewTask(mockTaskID, mockTaskURL, mockTaskTag, mockTaskApplication, commonv2.TaskType_STANDARD, mockTaskFilteredQueryParams, mockTaskHeader, mockTaskBackToSourceLimit, standard.WithDigest(mockTaskDigest), standard.WithPieceLength(mockTaskPieceLength))
peer := standard.NewPeer(mockPeerID, mockTask, mockHost)
svc := NewV2(&tc.config, resource, persistentCacheResource, scheduling, dynconfig, storage)
svc := NewV2(&tc.config, resource, persistentCacheResource, scheduling, dynconfig)
tc.run(t, svc, peer, seedPeerClient, resource.EXPECT(), seedPeerClient.EXPECT())
})

View File

@ -1,114 +0,0 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: storage.go
//
// Generated by this command:
//
// mockgen -destination mocks/storage_mock.go -source storage.go -package mocks
//
// Package mocks is a generated GoMock package.
package mocks
import (
io "io"
reflect "reflect"
storage "d7y.io/dragonfly/v2/scheduler/storage"
gomock "go.uber.org/mock/gomock"
)
// MockStorage is a mock of Storage interface.
type MockStorage struct {
ctrl *gomock.Controller
recorder *MockStorageMockRecorder
isgomock struct{}
}
// MockStorageMockRecorder is the mock recorder for MockStorage.
type MockStorageMockRecorder struct {
mock *MockStorage
}
// NewMockStorage creates a new mock instance.
func NewMockStorage(ctrl *gomock.Controller) *MockStorage {
mock := &MockStorage{ctrl: ctrl}
mock.recorder = &MockStorageMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use.
func (m *MockStorage) EXPECT() *MockStorageMockRecorder {
return m.recorder
}
// ClearDownload mocks base method.
func (m *MockStorage) ClearDownload() error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ClearDownload")
ret0, _ := ret[0].(error)
return ret0
}
// ClearDownload indicates an expected call of ClearDownload.
func (mr *MockStorageMockRecorder) ClearDownload() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ClearDownload", reflect.TypeOf((*MockStorage)(nil).ClearDownload))
}
// CreateDownload mocks base method.
func (m *MockStorage) CreateDownload(arg0 storage.Download) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "CreateDownload", arg0)
ret0, _ := ret[0].(error)
return ret0
}
// CreateDownload indicates an expected call of CreateDownload.
func (mr *MockStorageMockRecorder) CreateDownload(arg0 any) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateDownload", reflect.TypeOf((*MockStorage)(nil).CreateDownload), arg0)
}
// DownloadCount mocks base method.
func (m *MockStorage) DownloadCount() int64 {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DownloadCount")
ret0, _ := ret[0].(int64)
return ret0
}
// DownloadCount indicates an expected call of DownloadCount.
func (mr *MockStorageMockRecorder) DownloadCount() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DownloadCount", reflect.TypeOf((*MockStorage)(nil).DownloadCount))
}
// ListDownload mocks base method.
func (m *MockStorage) ListDownload() ([]storage.Download, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListDownload")
ret0, _ := ret[0].([]storage.Download)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListDownload indicates an expected call of ListDownload.
func (mr *MockStorageMockRecorder) ListDownload() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListDownload", reflect.TypeOf((*MockStorage)(nil).ListDownload))
}
// OpenDownload mocks base method.
func (m *MockStorage) OpenDownload() (io.ReadCloser, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "OpenDownload")
ret0, _ := ret[0].(io.ReadCloser)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// OpenDownload indicates an expected call of OpenDownload.
func (mr *MockStorageMockRecorder) OpenDownload() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OpenDownload", reflect.TypeOf((*MockStorage)(nil).OpenDownload))
}

View File

@ -1,307 +0,0 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
//go:generate mockgen -destination mocks/storage_mock.go -source storage.go -package mocks
package storage
import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"regexp"
"sort"
"sync"
"time"
"github.com/gocarina/gocsv"
logger "d7y.io/dragonfly/v2/internal/dflog"
pkgio "d7y.io/dragonfly/v2/pkg/io"
)
const (
// DownloadFilePrefix is prefix of download file name.
DownloadFilePrefix = "download"
// CSVFileExt is extension of file name.
CSVFileExt = "csv"
)
const (
// megabyte is the converted factor of MaxSize and bytes.
megabyte = 1024 * 1024
// backupTimeFormat is the timestamp format of backup filename.
backupTimeFormat = "2006-01-02T15-04-05.000"
)
// Storage is the interface used for storage.
type Storage interface {
// CreateDownload inserts the download into csv file.
CreateDownload(Download) error
// ListDownload returns all downloads in csv file.
ListDownload() ([]Download, error)
// DownloadCount returns the count of downloads.
DownloadCount() int64
// OpenDownload opens download files for read, it returns io.ReadCloser of download files.
OpenDownload() (io.ReadCloser, error)
// ClearDownload removes all download files.
ClearDownload() error
}
// storage provides storage function.
type storage struct {
baseDir string
maxSize int64
maxBackups int
bufferSize int
downloadMu *sync.RWMutex
downloadFilename string
downloadBuffer []Download
downloadCount int64
}
// New returns a new Storage instance.
func New(baseDir string, maxSize, maxBackups, bufferSize int) (Storage, error) {
s := &storage{
baseDir: baseDir,
maxSize: int64(maxSize * megabyte),
maxBackups: maxBackups,
bufferSize: bufferSize,
downloadMu: &sync.RWMutex{},
downloadFilename: filepath.Join(baseDir, fmt.Sprintf("%s.%s", DownloadFilePrefix, CSVFileExt)),
downloadBuffer: make([]Download, 0, bufferSize),
}
downloadFile, err := os.OpenFile(s.downloadFilename, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600)
if err != nil {
return nil, err
}
downloadFile.Close()
return s, nil
}
// CreateDownload inserts the download into csv file.
func (s *storage) CreateDownload(download Download) error {
s.downloadMu.Lock()
defer s.downloadMu.Unlock()
// Write without buffer.
if s.bufferSize == 0 {
if err := s.createDownload(download); err != nil {
return err
}
// Update download count.
s.downloadCount++
return nil
}
// Write downloads to file.
if len(s.downloadBuffer) >= s.bufferSize {
if err := s.createDownload(s.downloadBuffer...); err != nil {
return err
}
// Update download count.
s.downloadCount += int64(s.bufferSize)
// Keep allocated memory.
s.downloadBuffer = s.downloadBuffer[:0]
}
// Write downloads to buffer.
s.downloadBuffer = append(s.downloadBuffer, download)
return nil
}
// ListDownload returns all downloads in csv file.
func (s *storage) ListDownload() ([]Download, error) {
s.downloadMu.RLock()
defer s.downloadMu.RUnlock()
fileInfos, err := s.downloadBackups()
if err != nil {
return nil, err
}
var readers []io.Reader
var readClosers []io.ReadCloser
defer func() {
for _, readCloser := range readClosers {
if err := readCloser.Close(); err != nil {
logger.Error(err)
}
}
}()
for _, fileInfo := range fileInfos {
file, err := os.Open(filepath.Join(s.baseDir, fileInfo.Name()))
if err != nil {
return nil, err
}
readers = append(readers, file)
readClosers = append(readClosers, file)
}
var downloads []Download
if err := gocsv.UnmarshalWithoutHeaders(io.MultiReader(readers...), &downloads); err != nil {
return nil, err
}
return downloads, nil
}
// DownloadCount returns the count of downloads.
func (s *storage) DownloadCount() int64 {
return s.downloadCount
}
// OpenDownload opens download files for read, it returns io.ReadCloser of download files.
func (s *storage) OpenDownload() (io.ReadCloser, error) {
s.downloadMu.RLock()
defer s.downloadMu.RUnlock()
fileInfos, err := s.downloadBackups()
if err != nil {
return nil, err
}
var readClosers []io.ReadCloser
for _, fileInfo := range fileInfos {
file, err := os.Open(filepath.Join(s.baseDir, fileInfo.Name()))
if err != nil {
return nil, err
}
readClosers = append(readClosers, file)
}
return pkgio.MultiReadCloser(readClosers...), nil
}
// ClearDownload removes all downloads.
func (s *storage) ClearDownload() error {
s.downloadMu.Lock()
defer s.downloadMu.Unlock()
fileInfos, err := s.downloadBackups()
if err != nil {
return err
}
for _, fileInfo := range fileInfos {
filename := filepath.Join(s.baseDir, fileInfo.Name())
if err := os.Remove(filename); err != nil {
return err
}
}
return nil
}
// createDownload inserts the downloads into csv file.
func (s *storage) createDownload(downloads ...Download) (err error) {
file, err := s.openDownloadFile()
if err != nil {
return err
}
defer func() {
if cerr := file.Close(); cerr != nil {
err = errors.Join(err, cerr)
}
}()
return gocsv.MarshalWithoutHeaders(downloads, file)
}
// openDownloadFile opens the download file and removes download files that exceed the total size.
func (s *storage) openDownloadFile() (*os.File, error) {
fileInfo, err := os.Stat(s.downloadFilename)
if err != nil {
return nil, err
}
if s.maxSize <= fileInfo.Size() {
if err := os.Rename(s.downloadFilename, s.downloadBackupFilename()); err != nil {
return nil, err
}
}
fileInfos, err := s.downloadBackups()
if err != nil {
return nil, err
}
if s.maxBackups < len(fileInfos)+1 {
filename := filepath.Join(s.baseDir, fileInfos[0].Name())
if err := os.Remove(filename); err != nil {
return nil, err
}
}
file, err := os.OpenFile(s.downloadFilename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0600)
if err != nil {
return nil, err
}
return file, nil
}
// downloadBackupFilename generates download file name of backup files.
func (s *storage) downloadBackupFilename() string {
timestamp := time.Now().Format(backupTimeFormat)
return filepath.Join(s.baseDir, fmt.Sprintf("%s_%s.%s", DownloadFilePrefix, timestamp, CSVFileExt))
}
// downloadBackups returns download backup file information.
func (s *storage) downloadBackups() ([]fs.FileInfo, error) {
fileInfos, err := os.ReadDir(s.baseDir)
if err != nil {
return nil, err
}
var backups []fs.FileInfo
regexp := regexp.MustCompile(DownloadFilePrefix)
for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() && regexp.MatchString(fileInfo.Name()) {
info, _ := fileInfo.Info()
backups = append(backups, info)
}
}
if len(backups) <= 0 {
return nil, errors.New("download files backup does not exist")
}
sort.Slice(backups, func(i, j int) bool {
return backups[i].ModTime().Before(backups[j].ModTime())
})
return backups, nil
}

View File

@ -1,827 +0,0 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storage
import (
"fmt"
"os"
"path/filepath"
"reflect"
"regexp"
"testing"
"time"
"github.com/gocarina/gocsv"
"github.com/stretchr/testify/assert"
"d7y.io/dragonfly/v2/scheduler/config"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
)
var (
mockTask = Task{
ID: "1",
URL: "example.com",
Type: "normal",
ContentLength: 2048,
TotalPieceCount: 1,
BackToSourceLimit: 10,
BackToSourcePeerCount: 2,
State: "Succeeded",
CreatedAt: time.Now().UnixNano(),
UpdatedAt: time.Now().UnixNano(),
}
mockHost = Host{
ID: "2",
Type: "normal",
Hostname: "localhost",
IP: "127.0.0.1",
Port: 8080,
DownloadPort: 8081,
OS: "linux",
Platform: "ubuntu",
PlatformFamily: "debian",
PlatformVersion: "1.0.0",
KernelVersion: "1.0.0",
ConcurrentUploadLimit: 100,
ConcurrentUploadCount: 40,
UploadCount: 20,
UploadFailedCount: 3,
CPU: resource.CPU{
LogicalCount: 24,
PhysicalCount: 12,
Percent: 0.8,
ProcessPercent: 0.4,
Times: resource.CPUTimes{
User: 100,
System: 101,
Idle: 102,
Nice: 103,
Iowait: 104,
Irq: 105,
Softirq: 106,
Steal: 107,
Guest: 108,
GuestNice: 109,
},
},
Memory: resource.Memory{
Total: 20,
Available: 19,
Used: 16,
UsedPercent: 0.7,
ProcessUsedPercent: 0.2,
Free: 15,
},
Network: resource.Network{
TCPConnectionCount: 400,
UploadTCPConnectionCount: 200,
Location: "china",
IDC: "e1",
DownloadRate: 100,
DownloadRateLimit: 200,
UploadRate: 100,
UploadRateLimit: 200,
},
Disk: resource.Disk{
Total: 100,
Free: 88,
Used: 56,
UsedPercent: 0.9,
InodesTotal: 200,
InodesUsed: 180,
InodesFree: 160,
InodesUsedPercent: 0.6,
},
Build: resource.Build{
GitVersion: "3.0.0",
GitCommit: "2bf4d5e",
GoVersion: "1.19",
Platform: "linux",
},
SchedulerClusterID: 1,
CreatedAt: time.Now().UnixNano(),
UpdatedAt: time.Now().UnixNano(),
}
mockPiece = Piece{
Length: 20,
Cost: 10,
CreatedAt: time.Now().UnixNano(),
}
mockPieces = append(make([]Piece, 9), mockPiece)
mockParent = Parent{
ID: "4",
Tag: "m",
Application: "db",
State: "Succeeded",
Cost: 1000,
UploadPieceCount: 10,
FinishedPieceCount: 10,
Host: mockHost,
Pieces: mockPieces,
CreatedAt: time.Now().UnixNano(),
UpdatedAt: time.Now().UnixNano(),
}
mockParents = append(make([]Parent, 19), mockParent)
mockDownload = Download{
ID: "5",
Tag: "d",
Application: "mq",
State: "Succeeded",
Error: Error{
Code: "unknow",
Message: "unknow",
},
Cost: 1000,
FinishedPieceCount: 10,
Task: mockTask,
Host: mockHost,
Parents: mockParents,
CreatedAt: time.Now().UnixNano(),
UpdatedAt: time.Now().UnixNano(),
}
)
func TestStorage_New(t *testing.T) {
tests := []struct {
name string
baseDir string
expect func(t *testing.T, s Storage, err error)
}{
{
name: "new storage",
baseDir: os.TempDir(),
expect: func(t *testing.T, s Storage, err error) {
assert := assert.New(t)
assert.NoError(err)
assert.Equal(reflect.TypeOf(s).Elem().Name(), "storage")
assert.Equal(s.(*storage).maxSize, int64(config.DefaultStorageMaxSize*megabyte))
assert.Equal(s.(*storage).maxBackups, config.DefaultStorageMaxBackups)
assert.Equal(s.(*storage).bufferSize, config.DefaultStorageBufferSize)
assert.Equal(cap(s.(*storage).downloadBuffer), config.DefaultStorageBufferSize)
assert.Equal(len(s.(*storage).downloadBuffer), 0)
assert.Equal(s.(*storage).downloadCount, int64(0))
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
},
},
{
name: "new storage failed",
baseDir: "/foo",
expect: func(t *testing.T, s Storage, err error) {
assert := assert.New(t)
assert.Error(err)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize)
tc.expect(t, s, err)
})
}
}
func TestStorage_CreateDownload(t *testing.T) {
tests := []struct {
name string
baseDir string
bufferSize int
mock func(s Storage)
expect func(t *testing.T, s Storage, baseDir string)
}{
{
name: "create download",
baseDir: os.TempDir(),
bufferSize: 1,
mock: func(s Storage) {},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.NoError(s.CreateDownload(Download{}))
assert.Equal(s.(*storage).downloadCount, int64(0))
},
},
{
name: "create download without buffer",
baseDir: os.TempDir(),
bufferSize: 0,
mock: func(s Storage) {
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.NoError(s.CreateDownload(Download{}))
assert.Equal(s.(*storage).downloadCount, int64(1))
downloads, err := s.ListDownload()
assert.NoError(err)
assert.Equal(len(downloads), 1)
},
},
{
name: "write download to file",
baseDir: os.TempDir(),
bufferSize: 1,
mock: func(s Storage) {
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.NoError(s.CreateDownload(Download{}))
assert.NoError(s.CreateDownload(Download{}))
assert.Equal(s.(*storage).downloadCount, int64(1))
},
},
{
name: "open file failed",
baseDir: os.TempDir(),
bufferSize: 0,
mock: func(s Storage) {
s.(*storage).baseDir = "foo"
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.Error(s.CreateDownload(Download{}))
s.(*storage).baseDir = baseDir
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(s)
tc.expect(t, s, tc.baseDir)
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
})
}
}
func TestStorage_ListDownload(t *testing.T) {
tests := []struct {
name string
baseDir string
bufferSize int
download Download
mock func(t *testing.T, s Storage, baseDir string, download Download)
expect func(t *testing.T, s Storage, baseDir string, download Download)
}{
{
name: "empty csv file given",
baseDir: os.TempDir(),
bufferSize: config.DefaultStorageBufferSize,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.ListDownload()
assert.Error(err)
},
},
{
name: "get file infos failed",
baseDir: os.TempDir(),
bufferSize: config.DefaultStorageBufferSize,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
s.(*storage).baseDir = "bae"
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.ListDownload()
assert.Error(err)
s.(*storage).baseDir = baseDir
},
},
{
name: "open file failed",
baseDir: os.TempDir(),
bufferSize: config.DefaultStorageBufferSize,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
file, err := os.OpenFile(filepath.Join(baseDir, "download_test.csv"), os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0300)
if err != nil {
t.Fatal(err)
}
file.Close()
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.ListDownload()
assert.Error(err)
},
},
{
name: "list downloads of a file",
baseDir: os.TempDir(),
bufferSize: 1,
download: mockDownload,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
if err := s.CreateDownload(download); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.ListDownload()
assert.Error(err)
if err := s.CreateDownload(download); err != nil {
t.Fatal(err)
}
downloads, err := s.ListDownload()
assert.NoError(err)
assert.Equal(len(downloads), 1)
assert.EqualValues(downloads[0].ID, download.ID)
assert.EqualValues(downloads[0].Tag, download.Tag)
assert.EqualValues(downloads[0].Application, download.Application)
assert.EqualValues(downloads[0].State, download.State)
assert.EqualValues(downloads[0].Error, download.Error)
assert.EqualValues(downloads[0].Cost, download.Cost)
assert.EqualValues(downloads[0].Task, download.Task)
assert.EqualValues(downloads[0].Host, download.Host)
assert.EqualValues(downloads[0].CreatedAt, download.CreatedAt)
assert.EqualValues(downloads[0].UpdatedAt, download.UpdatedAt)
},
},
{
name: "list downloads of multi files",
baseDir: os.TempDir(),
bufferSize: 1,
download: Download{},
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
if err := s.CreateDownload(Download{ID: "2"}); err != nil {
t.Fatal(err)
}
if err := s.CreateDownload(Download{ID: "1"}); err != nil {
t.Fatal(err)
}
if err := s.CreateDownload(Download{ID: "3"}); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
downloads, err := s.ListDownload()
assert.NoError(err)
assert.Equal(len(downloads), 2)
assert.Equal(downloads[0].ID, "2")
assert.Equal(downloads[1].ID, "1")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(t, s, tc.baseDir, tc.download)
tc.expect(t, s, tc.baseDir, tc.download)
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
})
}
}
func TestStorage_DownloadCount(t *testing.T) {
tests := []struct {
name string
baseDir string
mock func(s Storage)
expect func(t *testing.T, s Storage)
}{
{
name: "get the count of downloads",
baseDir: os.TempDir(),
mock: func(s Storage) {
s.(*storage).downloadCount = 2
},
expect: func(t *testing.T, s Storage) {
assert := assert.New(t)
assert.Equal(int64(2), s.DownloadCount())
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(s)
tc.expect(t, s)
})
}
}
func TestStorage_OpenDownload(t *testing.T) {
tests := []struct {
name string
baseDir string
bufferSize int
download Download
mock func(t *testing.T, s Storage, baseDir string, download Download)
expect func(t *testing.T, s Storage, baseDir string, download Download)
}{
{
name: "open storage withempty csv file given",
baseDir: os.TempDir(),
bufferSize: config.DefaultStorageBufferSize,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.OpenDownload()
assert.NoError(err)
},
},
{
name: "open file infos failed",
baseDir: os.TempDir(),
bufferSize: config.DefaultStorageBufferSize,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
s.(*storage).baseDir = "bas"
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.OpenDownload()
assert.Error(err)
s.(*storage).baseDir = baseDir
},
},
{
name: "open storage with downloads of a file",
baseDir: os.TempDir(),
bufferSize: 1,
download: mockDownload,
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
if err := s.CreateDownload(download); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
_, err := s.OpenDownload()
assert.NoError(err)
if err := s.CreateDownload(download); err != nil {
t.Fatal(err)
}
readCloser, err := s.OpenDownload()
assert.NoError(err)
var downloads []Download
err = gocsv.UnmarshalWithoutHeaders(readCloser, &downloads)
assert.NoError(err)
assert.Equal(len(downloads), 1)
assert.EqualValues(downloads[0].ID, download.ID)
assert.EqualValues(downloads[0].Tag, download.Tag)
assert.EqualValues(downloads[0].Application, download.Application)
assert.EqualValues(downloads[0].State, download.State)
assert.EqualValues(downloads[0].Error, download.Error)
assert.EqualValues(downloads[0].Cost, download.Cost)
assert.EqualValues(downloads[0].Task, download.Task)
assert.EqualValues(downloads[0].Host, download.Host)
assert.EqualValues(downloads[0].CreatedAt, download.CreatedAt)
assert.EqualValues(downloads[0].UpdatedAt, download.UpdatedAt)
},
},
{
name: "open storage with downloads of multi files",
baseDir: os.TempDir(),
bufferSize: 1,
download: Download{},
mock: func(t *testing.T, s Storage, baseDir string, download Download) {
if err := s.CreateDownload(Download{ID: "2"}); err != nil {
t.Fatal(err)
}
if err := s.CreateDownload(Download{ID: "1"}); err != nil {
t.Fatal(err)
}
if err := s.CreateDownload(Download{ID: "3"}); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, s Storage, baseDir string, download Download) {
assert := assert.New(t)
readCloser, err := s.OpenDownload()
assert.NoError(err)
var downloads []Download
err = gocsv.UnmarshalWithoutHeaders(readCloser, &downloads)
assert.NoError(err)
assert.Equal(len(downloads), 2)
assert.Equal(downloads[0].ID, "2")
assert.Equal(downloads[1].ID, "1")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, tc.bufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(t, s, tc.baseDir, tc.download)
tc.expect(t, s, tc.baseDir, tc.download)
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
})
}
}
func TestStorage_ClearDownload(t *testing.T) {
tests := []struct {
name string
baseDir string
mock func(s Storage)
expect func(t *testing.T, s Storage, baseDir string)
}{
{
name: "clear file",
baseDir: os.TempDir(),
mock: func(s Storage) {},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.NoError(s.ClearDownload())
fileInfos, err := os.ReadDir(filepath.Join(baseDir))
assert.NoError(err)
var backups []os.DirEntry
regexp := regexp.MustCompile(DownloadFilePrefix)
for _, fileInfo := range fileInfos {
if !fileInfo.IsDir() && regexp.MatchString(fileInfo.Name()) {
backups = append(backups, fileInfo)
}
}
assert.Equal(len(backups), 0)
},
},
{
name: "open file failed",
baseDir: os.TempDir(),
mock: func(s Storage) {
s.(*storage).baseDir = "baz"
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.Error(s.ClearDownload())
s.(*storage).baseDir = baseDir
assert.NoError(s.ClearDownload())
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(s)
tc.expect(t, s, tc.baseDir)
})
}
}
func TestStorage_createDownload(t *testing.T) {
tests := []struct {
name string
baseDir string
mock func(s Storage)
expect func(t *testing.T, s Storage, baseDir string)
}{
{
name: "create download",
baseDir: os.TempDir(),
mock: func(s Storage) {},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.NoError(s.(*storage).createDownload(Download{}))
},
},
{
name: "open file failed",
baseDir: os.TempDir(),
mock: func(s Storage) {
s.(*storage).baseDir = "foo"
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
assert.Error(s.(*storage).createDownload(Download{}))
s.(*storage).baseDir = baseDir
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(s)
tc.expect(t, s, tc.baseDir)
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
})
}
}
func TestStorage_openDownloadFile(t *testing.T) {
tests := []struct {
name string
baseDir string
maxSize int
maxBackups int
bufferSize int
mock func(t *testing.T, s Storage)
expect func(t *testing.T, s Storage, baseDir string)
}{
{
name: "open file failed",
baseDir: os.TempDir(),
maxSize: config.DefaultStorageMaxSize,
maxBackups: config.DefaultStorageMaxBackups,
bufferSize: config.DefaultStorageBufferSize,
mock: func(t *testing.T, s Storage) {
s.(*storage).baseDir = "bat"
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
_, err := s.(*storage).openDownloadFile()
assert.Error(err)
s.(*storage).baseDir = baseDir
},
},
{
name: "open new download file",
baseDir: os.TempDir(),
maxSize: 0,
maxBackups: config.DefaultStorageMaxBackups,
bufferSize: 1,
mock: func(t *testing.T, s Storage) {
if err := s.CreateDownload(Download{ID: "1"}); err != nil {
t.Fatal(err)
}
if err := s.CreateDownload(Download{ID: "2"}); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
file, err := s.(*storage).openDownloadFile()
assert.NoError(err)
assert.Equal(file.Name(), filepath.Join(baseDir, fmt.Sprintf("%s.%s", DownloadFilePrefix, CSVFileExt)))
file.Close()
},
},
{
name: "remove download file",
baseDir: os.TempDir(),
maxSize: 0,
maxBackups: 1,
bufferSize: 1,
mock: func(t *testing.T, s Storage) {
if err := s.CreateDownload(Download{ID: "1"}); err != nil {
t.Fatal(err)
}
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
file, err := s.(*storage).openDownloadFile()
assert.NoError(err)
assert.Equal(file.Name(), filepath.Join(baseDir, fmt.Sprintf("%s.%s", DownloadFilePrefix, CSVFileExt)))
file.Close()
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, tc.maxSize, tc.maxBackups, tc.bufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(t, s)
tc.expect(t, s, tc.baseDir)
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
})
}
}
func TestStorage_downloadBackupFilename(t *testing.T) {
baseDir := os.TempDir()
s, err := New(baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize)
if err != nil {
t.Fatal(err)
}
filename := s.(*storage).downloadBackupFilename()
regexp := regexp.MustCompile(fmt.Sprintf("%s_.*.%s$", DownloadFilePrefix, CSVFileExt))
assert := assert.New(t)
assert.True(regexp.MatchString(filename))
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
}
func TestStorage_downloadBackups(t *testing.T) {
tests := []struct {
name string
baseDir string
mock func(t *testing.T, s Storage)
expect func(t *testing.T, s Storage, baseDir string)
}{
{
name: "open file failed",
baseDir: os.TempDir(),
mock: func(t *testing.T, s Storage) {
s.(*storage).baseDir = "bar"
},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
_, err := s.(*storage).downloadBackups()
assert.Error(err)
s.(*storage).baseDir = baseDir
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
},
},
{
name: "not found download file",
baseDir: os.TempDir(),
mock: func(t *testing.T, s Storage) {},
expect: func(t *testing.T, s Storage, baseDir string) {
assert := assert.New(t)
if err := s.ClearDownload(); err != nil {
t.Fatal(err)
}
_, err := s.(*storage).downloadBackups()
assert.Error(err)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
s, err := New(tc.baseDir, config.DefaultStorageMaxSize, config.DefaultStorageMaxBackups, config.DefaultStorageBufferSize)
if err != nil {
t.Fatal(err)
}
tc.mock(t, s)
tc.expect(t, s, tc.baseDir)
})
}
}

View File

@ -1,225 +0,0 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storage
import (
"time"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard"
)
// Task contains content for task.
type Task struct {
// ID is task id.
ID string `csv:"id"`
// URL is task download url.
URL string `csv:"url"`
// Type is task type.
Type string `csv:"type"`
// ContentLength is task total content length.
ContentLength int64 `csv:"contentLength"`
// TotalPieceCount is total piece count.
TotalPieceCount int32 `csv:"totalPieceCount"`
// BackToSourceLimit is back-to-source limit.
BackToSourceLimit int32 `csv:"backToSourceLimit"`
// BackToSourcePeerCount is back-to-source peer count.
BackToSourcePeerCount int32 `csv:"backToSourcePeerCount"`
// State is the download state of the task.
State string `csv:"state"`
// CreatedAt is peer create nanosecond time.
CreatedAt int64 `csv:"createdAt"`
// UpdatedAt is peer update nanosecond time.
UpdatedAt int64 `csv:"updatedAt"`
}
// Host contains content for host.
type Host struct {
// ID is host id.
ID string `csv:"id"`
// Type is host type.
Type string `csv:"type"`
// Hostname is host name.
Hostname string `csv:"hostname"`
// IP is host ip.
IP string `csv:"ip"`
// Port is grpc service port.
Port int32 `csv:"port"`
// DownloadPort is piece downloading port.
DownloadPort int32 `csv:"downloadPort"`
// Host OS.
OS string `csv:"os"`
// Host platform.
Platform string `csv:"platform"`
// Host platform family.
PlatformFamily string `csv:"platformFamily"`
// Host platform version.
PlatformVersion string `csv:"platformVersion"`
// Host kernel version.
KernelVersion string `csv:"kernelVersion"`
// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit int32 `csv:"concurrentUploadLimit"`
// ConcurrentUploadCount is concurrent upload count.
ConcurrentUploadCount int32 `csv:"concurrentUploadCount"`
// UploadCount is total upload count.
UploadCount int64 `csv:"uploadCount"`
// UploadFailedCount is upload failed count.
UploadFailedCount int64 `csv:"uploadFailedCount"`
// CPU Stat.
CPU resource.CPU `csv:"cpu"`
// Memory Stat.
Memory resource.Memory `csv:"memory"`
// Network Stat.
Network resource.Network `csv:"network"`
// Disk Stat.
Disk resource.Disk `csv:"disk"`
// Build information.
Build resource.Build `csv:"build"`
// SchedulerClusterID is scheduler cluster id.
SchedulerClusterID int64 `csv:"schedulerClusterId"`
// CreatedAt is peer create nanosecond time.
CreatedAt int64 `csv:"createdAt"`
// UpdatedAt is peer update nanosecond time.
UpdatedAt int64 `csv:"updatedAt"`
}
// Piece contains content for piece.
type Piece struct {
// Length is piece length.
Length int64 `csv:"length"`
// Cost is the cost time for downloading piece.
Cost int64 `csv:"cost"`
// CreatedAt is piece create time.
CreatedAt int64 `csv:"createdAt"`
}
// Parent contains content for parent.
type Parent struct {
// ID is peer id.
ID string `csv:"id"`
// Tag is peer tag.
Tag string `csv:"tag"`
// Application is peer application.
Application string `csv:"application"`
// State is the download state of the peer.
State string `csv:"state"`
// Cost is the task download duration of nanosecond.
Cost int64 `csv:"cost"`
// UploadPieceCount is upload piece count.
UploadPieceCount int32 `csv:"uploadPieceCount"`
// FinishedPieceCount is finished piece count.
FinishedPieceCount int32 `csv:"finishedPieceCount"`
// Host is peer host.
Host Host `csv:"host"`
// Pieces is downloaded pieces from parent host.
Pieces []Piece `csv:"pieces" csv[]:"10"`
// CreatedAt is peer create nanosecond time.
CreatedAt int64 `csv:"createdAt"`
// UpdatedAt is peer update nanosecond time.
UpdatedAt int64 `csv:"updatedAt"`
}
// Error contains content for error.
type Error struct {
time.Duration
// Code is the code of error.
Code string `csv:"code"`
// Message is the message of error.
Message string `csv:"message"`
}
// Download contains content for download.
type Download struct {
// ID is peer id.
ID string `csv:"id"`
// Tag is peer tag.
Tag string `csv:"tag"`
// Application is peer application.
Application string `csv:"application"`
// State is the download state of the peer.
State string `csv:"state"`
// Error is the details of error.
Error Error `csv:"error"`
// Cost is the task download duration of nanosecond.
Cost int64 `csv:"cost"`
// FinishedPieceCount is finished piece count.
FinishedPieceCount int32 `csv:"finishedPieceCount"`
// Task is peer task.
Task Task `csv:"task"`
// Host is peer host.
Host Host `csv:"host"`
// Parents is peer parents.
Parents []Parent `csv:"parents" csv[]:"20"`
// CreatedAt is peer create nanosecond time.
CreatedAt int64 `csv:"createdAt"`
// UpdatedAt is peer update nanosecond time.
UpdatedAt int64 `csv:"updatedAt"`
}