feat: implement StatPersistentCachePeerRequest and StatPersistentCacheTaskRequest for persistent cache (#3603)

This commit is contained in:
Gaius 2024-10-23 10:19:39 +08:00 committed by GitHub
parent eb4e101859
commit 770d6c9c52
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 598 additions and 443 deletions

View File

@ -212,6 +212,9 @@ type Build struct {
// Golang version. // Golang version.
GoVersion string `csv:"goVersion"` GoVersion string `csv:"goVersion"`
// Rust version.
RustVersion string `csv:"rustVersion"`
// Build platform. // Build platform.
Platform string `csv:"platform"` Platform string `csv:"platform"`
} }

View File

@ -62,6 +62,9 @@ type Peer struct {
// ID is persistent cache peer id. // ID is persistent cache peer id.
ID string ID string
// Persistent is whether the peer is persistent.
Persistent bool
// Pieces is finished pieces bitset. // Pieces is finished pieces bitset.
FinishedPieces *bitset.BitSet FinishedPieces *bitset.BitSet
@ -91,7 +94,7 @@ type Peer struct {
} }
// New persistent cache peer instance. // New persistent cache peer instance.
func NewPeer(id, state string, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host, func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, blockParents []string, task *Task, host *Host,
cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer { cost time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith) *Peer {
p := &Peer{ p := &Peer{
ID: id, ID: id,

View File

@ -77,6 +77,12 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return nil, false return nil, false
} }
persistent, err := strconv.ParseBool(rawPeer["persistent"])
if err != nil {
log.Errorf("parsing persistent failed: %v", err)
return nil, false
}
finishedPieces := &bitset.BitSet{} finishedPieces := &bitset.BitSet{}
if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err != nil { if err := finishedPieces.UnmarshalBinary([]byte(rawPeer["finished_pieces"])); err != nil {
log.Errorf("unmarshal finished pieces failed: %v", err) log.Errorf("unmarshal finished pieces failed: %v", err)
@ -123,6 +129,7 @@ func (p *peerManager) Load(ctx context.Context, peerID string) (*Peer, bool) {
return NewPeer( return NewPeer(
rawPeer["id"], rawPeer["id"],
rawPeer["state"], rawPeer["state"],
persistent,
finishedPieces, finishedPieces,
blockParents, blockParents,
task, task,
@ -153,6 +160,7 @@ func (p *peerManager) Store(ctx context.Context, peer *Peer) error {
pipe.HSet(ctx, pipe.HSet(ctx,
pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID), pkgredis.MakePersistentCachePeerKeyInScheduler(p.config.Manager.SchedulerClusterID, peer.ID),
"id", peer.ID, "id", peer.ID,
"persistent", peer.Persistent,
"finished_pieces", finishedPieces, "finished_pieces", finishedPieces,
"state", peer.FSM.Current(), "state", peer.FSM.Current(),
"block_parents", blockParents, "block_parents", blockParents,

View File

@ -21,7 +21,8 @@ import (
"d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage" "d7y.io/dragonfly/v2/scheduler/storage"
) )
@ -29,7 +30,8 @@ import (
// New returns a new scheduler server from the given options. // New returns a new scheduler server from the given options.
func New( func New(
cfg *config.Config, cfg *config.Config,
resource resource.Resource, resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling, scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface, dynconfig config.DynconfigInterface,
storage storage.Storage, storage storage.Storage,
@ -37,6 +39,6 @@ func New(
) *grpc.Server { ) *grpc.Server {
return server.New( return server.New(
newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage), newSchedulerServerV1(cfg, resource, scheduling, dynconfig, storage),
newSchedulerServerV2(cfg, resource, scheduling, dynconfig, storage), newSchedulerServerV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage),
opts...) opts...)
} }

View File

@ -26,7 +26,8 @@ import (
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks" configmocks "d7y.io/dragonfly/v2/scheduler/config/mocks"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling/mocks" "d7y.io/dragonfly/v2/scheduler/scheduling/mocks"
storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks" storagemocks "d7y.io/dragonfly/v2/scheduler/storage/mocks"
) )
@ -59,11 +60,12 @@ func TestRPCServer_New(t *testing.T) {
ctl := gomock.NewController(t) ctl := gomock.NewController(t)
defer ctl.Finish() defer ctl.Finish()
scheduling := mocks.NewMockScheduling(ctl) scheduling := mocks.NewMockScheduling(ctl)
res := resource.NewMockResource(ctl) resource := standard.NewMockResource(ctl)
persistentCacheResource := persistentcache.NewMockResource(ctl)
dynconfig := configmocks.NewMockDynconfigInterface(ctl) dynconfig := configmocks.NewMockDynconfigInterface(ctl)
storage := storagemocks.NewMockStorage(ctl) storage := storagemocks.NewMockStorage(ctl)
svr := New(&config.Config{Scheduler: mockSchedulerConfig}, res, scheduling, dynconfig, storage) svr := New(&config.Config{Scheduler: mockSchedulerConfig}, resource, persistentCacheResource, scheduling, dynconfig, storage)
tc.expect(t, svr) tc.expect(t, svr)
}) })
} }

View File

@ -26,13 +26,13 @@ import (
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/metrics"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/service" "d7y.io/dragonfly/v2/scheduler/service"
"d7y.io/dragonfly/v2/scheduler/storage" "d7y.io/dragonfly/v2/scheduler/storage"
) )
// TODO Implement v2 version of the rpc server apis.
// schedulerServerV2 is v2 version of the scheduler grpc server. // schedulerServerV2 is v2 version of the scheduler grpc server.
type schedulerServerV2 struct { type schedulerServerV2 struct {
// Service interface. // Service interface.
@ -42,12 +42,13 @@ type schedulerServerV2 struct {
// newSchedulerServerV2 returns v2 version of the scheduler server. // newSchedulerServerV2 returns v2 version of the scheduler server.
func newSchedulerServerV2( func newSchedulerServerV2(
cfg *config.Config, cfg *config.Config,
resource resource.Resource, resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling, scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface, dynconfig config.DynconfigInterface,
storage storage.Storage, storage storage.Storage,
) schedulerv2.SchedulerServer { ) schedulerv2.SchedulerServer {
return &schedulerServerV2{service.NewV2(cfg, resource, scheduling, dynconfig, storage)} return &schedulerServerV2{service.NewV2(cfg, resource, persistentCacheResource, scheduling, dynconfig, storage)}
} }
// AnnouncePeer announces peer to scheduler. // AnnouncePeer announces peer to scheduler.

View File

@ -42,8 +42,8 @@ import (
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/job" "d7y.io/dragonfly/v2/scheduler/job"
"d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/metrics"
persistentcache "d7y.io/dragonfly/v2/scheduler/resource/persistentcache" "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
standard "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/rpcserver" "d7y.io/dragonfly/v2/scheduler/rpcserver"
"d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage" "d7y.io/dragonfly/v2/scheduler/storage"
@ -212,7 +212,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(rpc.NewInsecureCredentials())) schedulerServerOptions = append(schedulerServerOptions, grpc.Creds(rpc.NewInsecureCredentials()))
} }
svr := rpcserver.New(cfg, resource, scheduling, dynconfig, s.storage, schedulerServerOptions...) svr := rpcserver.New(cfg, resource, s.persistentCacheResource, scheduling, dynconfig, s.storage, schedulerServerOptions...)
s.grpcServer = svr s.grpcServer = svr
// Initialize metrics. // Initialize metrics.

View File

@ -807,7 +807,7 @@ func (v *V1) triggerSeedPeerTask(ctx context.Context, rg *http.Range, task *reso
} }
// storeTask stores a new task or reuses a previous task. // storeTask stores a new task or reuses a previous task.
func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task { func (v *V1) storeTask(_ context.Context, req *schedulerv1.PeerTaskRequest, typ commonv2.TaskType) *resource.Task {
filteredQueryParams := strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator) filteredQueryParams := strings.Split(req.UrlMeta.GetFilter(), idgen.FilteredQueryParamsSeparator)
task, loaded := v.resource.TaskManager().Load(req.GetTaskId()) task, loaded := v.resource.TaskManager().Load(req.GetTaskId())
@ -834,7 +834,7 @@ func (v *V1) storeTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, ty
} }
// storeHost stores a new host or reuses a previous host. // storeHost stores a new host or reuses a previous host.
func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *resource.Host { func (v *V1) storeHost(_ context.Context, peerHost *schedulerv1.PeerHost) *resource.Host {
host, loaded := v.resource.HostManager().Load(peerHost.Id) host, loaded := v.resource.HostManager().Load(peerHost.Id)
if !loaded { if !loaded {
options := []resource.HostOption{resource.WithNetwork(resource.Network{ options := []resource.HostOption{resource.WithNetwork(resource.Network{
@ -866,7 +866,7 @@ func (v *V1) storeHost(ctx context.Context, peerHost *schedulerv1.PeerHost) *res
} }
// storePeer stores a new peer or reuses a previous peer. // storePeer stores a new peer or reuses a previous peer.
func (v *V1) storePeer(ctx context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer { func (v *V1) storePeer(_ context.Context, id string, priority commonv1.Priority, rg string, task *resource.Task, host *resource.Host) *resource.Peer {
peer, loaded := v.resource.PeerManager().Load(id) peer, loaded := v.resource.PeerManager().Load(id)
if !loaded { if !loaded {
options := []resource.PeerOption{} options := []resource.PeerOption{}
@ -1057,7 +1057,7 @@ func (v *V1) handleBeginOfPiece(ctx context.Context, peer *resource.Peer) {
func (v *V1) handleEndOfPiece(ctx context.Context, peer *resource.Peer) {} func (v *V1) handleEndOfPiece(ctx context.Context, peer *resource.Peer) {}
// handlePieceSuccess handles successful piece. // handlePieceSuccess handles successful piece.
func (v *V1) handlePieceSuccess(ctx context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) { func (v *V1) handlePieceSuccess(_ context.Context, peer *resource.Peer, pieceResult *schedulerv1.PieceResult) {
// Distinguish traffic type. // Distinguish traffic type.
trafficType := commonv2.TrafficType_REMOTE_PEER trafficType := commonv2.TrafficType_REMOTE_PEER
if resource.IsPieceBackToSource(pieceResult.DstPid) { if resource.IsPieceBackToSource(pieceResult.DstPid) {

View File

@ -39,7 +39,8 @@ import (
"d7y.io/dragonfly/v2/pkg/types" "d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/config"
"d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/metrics"
resource "d7y.io/dragonfly/v2/scheduler/resource/standard" "d7y.io/dragonfly/v2/scheduler/resource/persistentcache"
"d7y.io/dragonfly/v2/scheduler/resource/standard"
"d7y.io/dragonfly/v2/scheduler/scheduling" "d7y.io/dragonfly/v2/scheduler/scheduling"
"d7y.io/dragonfly/v2/scheduler/storage" "d7y.io/dragonfly/v2/scheduler/storage"
) )
@ -47,7 +48,10 @@ import (
// V2 is the interface for v2 version of the service. // V2 is the interface for v2 version of the service.
type V2 struct { type V2 struct {
// Resource interface. // Resource interface.
resource resource.Resource resource standard.Resource
// Persistent cache resource interface.
persistentCacheResource persistentcache.Resource
// Scheduling interface. // Scheduling interface.
scheduling scheduling.Scheduling scheduling scheduling.Scheduling
@ -65,17 +69,19 @@ type V2 struct {
// New v2 version of service instance. // New v2 version of service instance.
func NewV2( func NewV2(
cfg *config.Config, cfg *config.Config,
resource resource.Resource, resource standard.Resource,
persistentCacheResource persistentcache.Resource,
scheduling scheduling.Scheduling, scheduling scheduling.Scheduling,
dynconfig config.DynconfigInterface, dynconfig config.DynconfigInterface,
storage storage.Storage, storage storage.Storage,
) *V2 { ) *V2 {
return &V2{ return &V2{
resource: resource, resource: resource,
scheduling: scheduling, persistentCacheResource: persistentCacheResource,
config: cfg, scheduling: scheduling,
dynconfig: dynconfig, config: cfg,
storage: storage, dynconfig: dynconfig,
storage: storage,
} }
} }
@ -227,7 +233,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
// Set pieces to response. // Set pieces to response.
peer.Pieces.Range(func(key, value any) bool { peer.Pieces.Range(func(key, value any) bool {
piece, ok := value.(*resource.Piece) piece, ok := value.(*standard.Piece)
if !ok { if !ok {
peer.Log.Errorf("invalid piece %s %#v", key, value) peer.Log.Errorf("invalid piece %s %#v", key, value)
return true return true
@ -278,7 +284,7 @@ func (v *V2) StatPeer(ctx context.Context, req *schedulerv2.StatPeerRequest) (*c
// Set pieces to task response. // Set pieces to task response.
peer.Task.Pieces.Range(func(key, value any) bool { peer.Task.Pieces.Range(func(key, value any) bool {
piece, ok := value.(*resource.Piece) piece, ok := value.(*standard.Piece)
if !ok { if !ok {
peer.Task.Log.Errorf("invalid piece %s %#v", key, value) peer.Task.Log.Errorf("invalid piece %s %#v", key, value)
return true return true
@ -384,7 +390,7 @@ func (v *V2) DeletePeer(ctx context.Context, req *schedulerv2.DeletePeerRequest)
return status.Error(codes.NotFound, msg) return status.Error(codes.NotFound, msg)
} }
if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventLeave); err != nil {
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return status.Error(codes.FailedPrecondition, msg) return status.Error(codes.FailedPrecondition, msg)
@ -431,7 +437,7 @@ func (v *V2) StatTask(ctx context.Context, req *schedulerv2.StatTaskRequest) (*c
// Set pieces to response. // Set pieces to response.
task.Pieces.Range(func(key, value any) bool { task.Pieces.Range(func(key, value any) bool {
piece, ok := value.(*resource.Piece) piece, ok := value.(*standard.Piece)
if !ok { if !ok {
task.Log.Errorf("invalid piece %s %#v", key, value) task.Log.Errorf("invalid piece %s %#v", key, value)
return true return true
@ -471,14 +477,14 @@ func (v *V2) DeleteTask(ctx context.Context, req *schedulerv2.DeleteTaskRequest)
} }
host.Peers.Range(func(key, value any) bool { host.Peers.Range(func(key, value any) bool {
peer, ok := value.(*resource.Peer) peer, ok := value.(*standard.Peer)
if !ok { if !ok {
host.Log.Errorf("invalid peer %s %#v", key, value) host.Log.Errorf("invalid peer %s %#v", key, value)
return true return true
} }
if peer.Task.ID == req.GetTaskId() { if peer.Task.ID == req.GetTaskId() {
if err := peer.FSM.Event(ctx, resource.PeerEventLeave); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventLeave); err != nil {
msg := fmt.Sprintf("peer fsm event failed: %s", err.Error()) msg := fmt.Sprintf("peer fsm event failed: %s", err.Error())
peer.Log.Error(msg) peer.Log.Error(msg)
return true return true
@ -510,26 +516,26 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
host, loaded := v.resource.HostManager().Load(req.Host.GetId()) host, loaded := v.resource.HostManager().Load(req.Host.GetId())
if !loaded { if !loaded {
options := []resource.HostOption{ options := []standard.HostOption{
resource.WithDisableShared(req.Host.GetDisableShared()), standard.WithDisableShared(req.Host.GetDisableShared()),
resource.WithOS(req.Host.GetOs()), standard.WithOS(req.Host.GetOs()),
resource.WithPlatform(req.Host.GetPlatform()), standard.WithPlatform(req.Host.GetPlatform()),
resource.WithPlatformFamily(req.Host.GetPlatformFamily()), standard.WithPlatformFamily(req.Host.GetPlatformFamily()),
resource.WithPlatformVersion(req.Host.GetPlatformVersion()), standard.WithPlatformVersion(req.Host.GetPlatformVersion()),
resource.WithKernelVersion(req.Host.GetKernelVersion()), standard.WithKernelVersion(req.Host.GetKernelVersion()),
} }
if concurrentUploadLimit > 0 { if concurrentUploadLimit > 0 {
options = append(options, resource.WithConcurrentUploadLimit(concurrentUploadLimit)) options = append(options, standard.WithConcurrentUploadLimit(concurrentUploadLimit))
} }
if req.Host.GetCpu() != nil { if req.Host.GetCpu() != nil {
options = append(options, resource.WithCPU(resource.CPU{ options = append(options, standard.WithCPU(standard.CPU{
LogicalCount: req.Host.Cpu.GetLogicalCount(), LogicalCount: req.Host.Cpu.GetLogicalCount(),
PhysicalCount: req.Host.Cpu.GetPhysicalCount(), PhysicalCount: req.Host.Cpu.GetPhysicalCount(),
Percent: req.Host.Cpu.GetPercent(), Percent: req.Host.Cpu.GetPercent(),
ProcessPercent: req.Host.Cpu.GetProcessPercent(), ProcessPercent: req.Host.Cpu.GetProcessPercent(),
Times: resource.CPUTimes{ Times: standard.CPUTimes{
User: req.Host.Cpu.Times.GetUser(), User: req.Host.Cpu.Times.GetUser(),
System: req.Host.Cpu.Times.GetSystem(), System: req.Host.Cpu.Times.GetSystem(),
Idle: req.Host.Cpu.Times.GetIdle(), Idle: req.Host.Cpu.Times.GetIdle(),
@ -545,7 +551,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetMemory() != nil { if req.Host.GetMemory() != nil {
options = append(options, resource.WithMemory(resource.Memory{ options = append(options, standard.WithMemory(standard.Memory{
Total: req.Host.Memory.GetTotal(), Total: req.Host.Memory.GetTotal(),
Available: req.Host.Memory.GetAvailable(), Available: req.Host.Memory.GetAvailable(),
Used: req.Host.Memory.GetUsed(), Used: req.Host.Memory.GetUsed(),
@ -556,7 +562,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetNetwork() != nil { if req.Host.GetNetwork() != nil {
options = append(options, resource.WithNetwork(resource.Network{ options = append(options, standard.WithNetwork(standard.Network{
TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(), TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(),
UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(), UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),
Location: req.Host.Network.GetLocation(), Location: req.Host.Network.GetLocation(),
@ -569,7 +575,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetDisk() != nil { if req.Host.GetDisk() != nil {
options = append(options, resource.WithDisk(resource.Disk{ options = append(options, standard.WithDisk(standard.Disk{
Total: req.Host.Disk.GetTotal(), Total: req.Host.Disk.GetTotal(),
Free: req.Host.Disk.GetFree(), Free: req.Host.Disk.GetFree(),
Used: req.Host.Disk.GetUsed(), Used: req.Host.Disk.GetUsed(),
@ -582,7 +588,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetBuild() != nil { if req.Host.GetBuild() != nil {
options = append(options, resource.WithBuild(resource.Build{ options = append(options, standard.WithBuild(standard.Build{
GitVersion: req.Host.Build.GetGitVersion(), GitVersion: req.Host.Build.GetGitVersion(),
GitCommit: req.Host.Build.GetGitCommit(), GitCommit: req.Host.Build.GetGitCommit(),
GoVersion: req.Host.Build.GetGoVersion(), GoVersion: req.Host.Build.GetGoVersion(),
@ -591,14 +597,14 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetSchedulerClusterId() != 0 { if req.Host.GetSchedulerClusterId() != 0 {
options = append(options, resource.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID))) options = append(options, standard.WithSchedulerClusterID(uint64(v.config.Manager.SchedulerClusterID)))
} }
if req.GetInterval() != nil { if req.GetInterval() != nil {
options = append(options, resource.WithAnnounceInterval(req.GetInterval().AsDuration())) options = append(options, standard.WithAnnounceInterval(req.GetInterval().AsDuration()))
} }
host = resource.NewHost( host = standard.NewHost(
req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(), req.Host.GetId(), req.Host.GetIp(), req.Host.GetHostname(),
req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()), req.Host.GetPort(), req.Host.GetDownloadPort(), types.HostType(req.Host.GetType()),
options..., options...,
@ -626,12 +632,12 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetCpu() != nil { if req.Host.GetCpu() != nil {
host.CPU = resource.CPU{ host.CPU = standard.CPU{
LogicalCount: req.Host.Cpu.GetLogicalCount(), LogicalCount: req.Host.Cpu.GetLogicalCount(),
PhysicalCount: req.Host.Cpu.GetPhysicalCount(), PhysicalCount: req.Host.Cpu.GetPhysicalCount(),
Percent: req.Host.Cpu.GetPercent(), Percent: req.Host.Cpu.GetPercent(),
ProcessPercent: req.Host.Cpu.GetProcessPercent(), ProcessPercent: req.Host.Cpu.GetProcessPercent(),
Times: resource.CPUTimes{ Times: standard.CPUTimes{
User: req.Host.Cpu.Times.GetUser(), User: req.Host.Cpu.Times.GetUser(),
System: req.Host.Cpu.Times.GetSystem(), System: req.Host.Cpu.Times.GetSystem(),
Idle: req.Host.Cpu.Times.GetIdle(), Idle: req.Host.Cpu.Times.GetIdle(),
@ -647,7 +653,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetMemory() != nil { if req.Host.GetMemory() != nil {
host.Memory = resource.Memory{ host.Memory = standard.Memory{
Total: req.Host.Memory.GetTotal(), Total: req.Host.Memory.GetTotal(),
Available: req.Host.Memory.GetAvailable(), Available: req.Host.Memory.GetAvailable(),
Used: req.Host.Memory.GetUsed(), Used: req.Host.Memory.GetUsed(),
@ -658,7 +664,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetNetwork() != nil { if req.Host.GetNetwork() != nil {
host.Network = resource.Network{ host.Network = standard.Network{
TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(), TCPConnectionCount: req.Host.Network.GetTcpConnectionCount(),
UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(), UploadTCPConnectionCount: req.Host.Network.GetUploadTcpConnectionCount(),
Location: req.Host.Network.GetLocation(), Location: req.Host.Network.GetLocation(),
@ -671,7 +677,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetDisk() != nil { if req.Host.GetDisk() != nil {
host.Disk = resource.Disk{ host.Disk = standard.Disk{
Total: req.Host.Disk.GetTotal(), Total: req.Host.Disk.GetTotal(),
Free: req.Host.Disk.GetFree(), Free: req.Host.Disk.GetFree(),
Used: req.Host.Disk.GetUsed(), Used: req.Host.Disk.GetUsed(),
@ -684,7 +690,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
} }
if req.Host.GetBuild() != nil { if req.Host.GetBuild() != nil {
host.Build = resource.Build{ host.Build = standard.Build{
GitVersion: req.Host.Build.GetGitVersion(), GitVersion: req.Host.Build.GetGitVersion(),
GitCommit: req.Host.Build.GetGitCommit(), GitCommit: req.Host.Build.GetGitCommit(),
GoVersion: req.Host.Build.GetGoVersion(), GoVersion: req.Host.Build.GetGoVersion(),
@ -703,7 +709,7 @@ func (v *V2) AnnounceHost(ctx context.Context, req *schedulerv2.AnnounceHostRequ
func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) { func (v *V2) ListHosts(ctx context.Context) (*schedulerv2.ListHostsResponse, error) {
hosts := []*commonv2.Host{} hosts := []*commonv2.Host{}
v.resource.HostManager().Range(func(_ any, value any) bool { v.resource.HostManager().Range(func(_ any, value any) bool {
host, ok := value.(*resource.Host) host, ok := value.(*standard.Host)
if !ok { if !ok {
// Continue to next host. // Continue to next host.
logger.Warnf("invalid host %#v", value) logger.Warnf("invalid host %#v", value)
@ -823,10 +829,10 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
peer.NeedBackToSource.Store(true) peer.NeedBackToSource.Store(true)
// If task is pending, failed, leave, or succeeded and has no available peer, // If task is pending, failed, leave, or succeeded and has no available peer,
// scheduler trigger seed peer download back-to-source. // scheduler trigger seed peer download back-to-source.
case task.FSM.Is(resource.TaskStatePending) || case task.FSM.Is(standard.TaskStatePending) ||
task.FSM.Is(resource.TaskStateFailed) || task.FSM.Is(standard.TaskStateFailed) ||
task.FSM.Is(resource.TaskStateLeave) || task.FSM.Is(standard.TaskStateLeave) ||
task.FSM.Is(resource.TaskStateSucceeded) && task.FSM.Is(standard.TaskStateSucceeded) &&
!task.HasAvailablePeer(blocklist): !task.HasAvailablePeer(blocklist):
// If HostType is normal, trigger seed peer download back-to-source. // If HostType is normal, trigger seed peer download back-to-source.
@ -853,8 +859,8 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
} }
// Handle task with peer register request. // Handle task with peer register request.
if !peer.Task.FSM.Is(resource.TaskStateRunning) { if !peer.Task.FSM.Is(standard.TaskStateRunning) {
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownload); err != nil { if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownload); err != nil {
// Collect RegisterPeerFailureCount metrics. // Collect RegisterPeerFailureCount metrics.
metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.RegisterPeerFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
@ -875,7 +881,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
return status.Error(codes.NotFound, "AnnouncePeerStream not found") return status.Error(codes.NotFound, "AnnouncePeerStream not found")
} }
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterEmpty); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventRegisterEmpty); err != nil {
return status.Errorf(codes.Internal, err.Error()) return status.Errorf(codes.Internal, err.Error())
} }
@ -891,7 +897,7 @@ func (v *V2) handleRegisterPeerRequest(ctx context.Context, stream schedulerv2.S
return nil return nil
case commonv2.SizeScope_NORMAL, commonv2.SizeScope_TINY, commonv2.SizeScope_SMALL, commonv2.SizeScope_UNKNOW: case commonv2.SizeScope_NORMAL, commonv2.SizeScope_TINY, commonv2.SizeScope_SMALL, commonv2.SizeScope_UNKNOW:
peer.Log.Info("scheduling as SizeScope_NORMAL") peer.Log.Info("scheduling as SizeScope_NORMAL")
if err := peer.FSM.Event(ctx, resource.PeerEventRegisterNormal); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventRegisterNormal); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -928,8 +934,8 @@ func (v *V2) handleDownloadPeerStartedRequest(ctx context.Context, peerID string
peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
// Handle peer with peer started request. // Handle peer with peer started request.
if !peer.FSM.Is(resource.PeerStateRunning) { if !peer.FSM.Is(standard.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownload); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventDownload); err != nil {
// Collect DownloadPeerStartedFailureCount metrics. // Collect DownloadPeerStartedFailureCount metrics.
metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
@ -953,8 +959,8 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
// Handle peer with peer back-to-source started request. // Handle peer with peer back-to-source started request.
if !peer.FSM.Is(resource.PeerStateRunning) { if !peer.FSM.Is(standard.PeerStateRunning) {
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadBackToSource); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventDownloadBackToSource); err != nil {
// Collect DownloadPeerBackToSourceStartedFailureCount metrics. // Collect DownloadPeerBackToSourceStartedFailureCount metrics.
metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(), metrics.DownloadPeerBackToSourceStartedFailureCount.WithLabelValues(priority.String(), peer.Task.Type.String(),
peer.Host.Type.Name()).Inc() peer.Host.Type.Name()).Inc()
@ -966,7 +972,7 @@ func (v *V2) handleDownloadPeerBackToSourceStartedRequest(ctx context.Context, p
} }
// handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest. // handleRescheduleRequest handles RescheduleRequest of AnnouncePeerRequest.
func (v *V2) handleRescheduleRequest(ctx context.Context, peerID string, candidateParents []*commonv2.Peer) error { func (v *V2) handleRescheduleRequest(_ context.Context, peerID string, candidateParents []*commonv2.Peer) error {
peer, loaded := v.resource.PeerManager().Load(peerID) peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded { if !loaded {
return status.Errorf(codes.NotFound, "peer %s not found", peerID) return status.Errorf(codes.NotFound, "peer %s not found", peerID)
@ -997,7 +1003,7 @@ func (v *V2) handleDownloadPeerFinishedRequest(ctx context.Context, peerID strin
// Handle peer with peer finished request. // Handle peer with peer finished request.
peer.Cost.Store(time.Since(peer.CreatedAt.Load())) peer.Cost.Store(time.Since(peer.CreatedAt.Load()))
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -1020,16 +1026,16 @@ func (v *V2) handleDownloadPeerBackToSourceFinishedRequest(ctx context.Context,
// Handle peer with peer back-to-source finished request. // Handle peer with peer back-to-source finished request.
peer.Cost.Store(time.Since(peer.CreatedAt.Load())) peer.Cost.Store(time.Since(peer.CreatedAt.Load()))
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadSucceeded); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
// Handle task with peer back-to-source finished request, peer can only represent // Handle task with peer back-to-source finished request, peer can only represent
// a successful task after downloading the complete task. // a successful task after downloading the complete task.
if peer.Range == nil && !peer.Task.FSM.Is(resource.TaskStateSucceeded) { if peer.Range == nil && !peer.Task.FSM.Is(standard.TaskStateSucceeded) {
peer.Task.ContentLength.Store(int64(req.GetContentLength())) peer.Task.ContentLength.Store(int64(req.GetContentLength()))
peer.Task.TotalPieceCount.Store(int32(req.GetPieceCount())) peer.Task.TotalPieceCount.Store(int32(req.GetPieceCount()))
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadSucceeded); err != nil { if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownloadSucceeded); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
} }
@ -1052,7 +1058,7 @@ func (v *V2) handleDownloadPeerFailedRequest(ctx context.Context, peerID string)
} }
// Handle peer with peer failed request. // Handle peer with peer failed request.
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadFailed); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventDownloadFailed); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -1077,7 +1083,7 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe
} }
// Handle peer with peer back-to-source failed request. // Handle peer with peer back-to-source failed request.
if err := peer.FSM.Event(ctx, resource.PeerEventDownloadFailed); err != nil { if err := peer.FSM.Event(ctx, standard.PeerEventDownloadFailed); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -1085,7 +1091,7 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe
peer.Task.ContentLength.Store(-1) peer.Task.ContentLength.Store(-1)
peer.Task.TotalPieceCount.Store(0) peer.Task.TotalPieceCount.Store(0)
peer.Task.DirectPiece = []byte{} peer.Task.DirectPiece = []byte{}
if err := peer.Task.FSM.Event(ctx, resource.TaskEventDownloadFailed); err != nil { if err := peer.Task.FSM.Event(ctx, standard.TaskEventDownloadFailed); err != nil {
return status.Error(codes.Internal, err.Error()) return status.Error(codes.Internal, err.Error())
} }
@ -1102,7 +1108,7 @@ func (v *V2) handleDownloadPeerBackToSourceFailedRequest(ctx context.Context, pe
// handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest. // handleDownloadPieceFinishedRequest handles DownloadPieceFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error { func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2.DownloadPieceFinishedRequest) error {
// Construct piece. // Construct piece.
piece := &resource.Piece{ piece := &standard.Piece{
Number: int32(req.Piece.GetNumber()), Number: int32(req.Piece.GetNumber()),
ParentID: req.Piece.GetParentId(), ParentID: req.Piece.GetParentId(),
Offset: req.Piece.GetOffset(), Offset: req.Piece.GetOffset(),
@ -1163,9 +1169,9 @@ func (v *V2) handleDownloadPieceFinishedRequest(peerID string, req *schedulerv2.
} }
// handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest. // handleDownloadPieceBackToSourceFinishedRequest handles DownloadPieceBackToSourceFinishedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error { func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(_ context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFinishedRequest) error {
// Construct piece. // Construct piece.
piece := &resource.Piece{ piece := &standard.Piece{
Number: int32(req.Piece.GetNumber()), Number: int32(req.Piece.GetNumber()),
ParentID: req.Piece.GetParentId(), ParentID: req.Piece.GetParentId(),
Offset: req.Piece.GetOffset(), Offset: req.Piece.GetOffset(),
@ -1215,7 +1221,7 @@ func (v *V2) handleDownloadPieceBackToSourceFinishedRequest(ctx context.Context,
} }
// handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest. // handleDownloadPieceFailedRequest handles DownloadPieceFailedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceFailedRequest) error { func (v *V2) handleDownloadPieceFailedRequest(_ context.Context, peerID string, req *schedulerv2.DownloadPieceFailedRequest) error {
peer, loaded := v.resource.PeerManager().Load(peerID) peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded { if !loaded {
return status.Errorf(codes.NotFound, "peer %s not found", peerID) return status.Errorf(codes.NotFound, "peer %s not found", peerID)
@ -1244,7 +1250,7 @@ func (v *V2) handleDownloadPieceFailedRequest(ctx context.Context, peerID string
} }
// handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest. // handleDownloadPieceBackToSourceFailedRequest handles DownloadPieceBackToSourceFailedRequest of AnnouncePeerRequest.
func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, peerID string, req *schedulerv2.DownloadPieceBackToSourceFailedRequest) error { func (v *V2) handleDownloadPieceBackToSourceFailedRequest(_ context.Context, peerID string, _ *schedulerv2.DownloadPieceBackToSourceFailedRequest) error {
peer, loaded := v.resource.PeerManager().Load(peerID) peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded { if !loaded {
return status.Errorf(codes.NotFound, "peer %s not found", peerID) return status.Errorf(codes.NotFound, "peer %s not found", peerID)
@ -1266,7 +1272,7 @@ func (v *V2) handleDownloadPieceBackToSourceFailedRequest(ctx context.Context, p
} }
// handleResource handles resource included host, task, and peer. // handleResource handles resource included host, task, and peer.
func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*resource.Host, *resource.Task, *resource.Peer, error) { func (v *V2) handleResource(_ context.Context, stream schedulerv2.Scheduler_AnnouncePeerServer, hostID, taskID, peerID string, download *commonv2.Download) (*standard.Host, *standard.Task, *standard.Peer, error) {
// If the host does not exist and the host address cannot be found, // If the host does not exist and the host address cannot be found,
// it may cause an exception. // it may cause an exception.
host, loaded := v.resource.HostManager().Load(hostID) host, loaded := v.resource.HostManager().Load(hostID)
@ -1277,7 +1283,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
// Store new task or update task. // Store new task or update task.
task, loaded := v.resource.TaskManager().Load(taskID) task, loaded := v.resource.TaskManager().Load(taskID)
if !loaded { if !loaded {
options := []resource.TaskOption{resource.WithPieceLength(int32(download.GetPieceLength()))} options := []standard.TaskOption{standard.WithPieceLength(int32(download.GetPieceLength()))}
if download.GetDigest() != "" { if download.GetDigest() != "" {
d, err := digest.Parse(download.GetDigest()) d, err := digest.Parse(download.GetDigest())
if err != nil { if err != nil {
@ -1285,10 +1291,10 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
} }
// If request has invalid digest, then new task with the nil digest. // If request has invalid digest, then new task with the nil digest.
options = append(options, resource.WithDigest(d)) options = append(options, standard.WithDigest(d))
} }
task = resource.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(), task = standard.NewTask(taskID, download.GetUrl(), download.GetTag(), download.GetApplication(), download.GetType(),
download.GetFilteredQueryParams(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...) download.GetFilteredQueryParams(), download.GetRequestHeader(), int32(v.config.Scheduler.BackToSourceCount), options...)
v.resource.TaskManager().Store(task) v.resource.TaskManager().Store(task)
} else { } else {
@ -1300,12 +1306,12 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
// Store new peer or load peer. // Store new peer or load peer.
peer, loaded := v.resource.PeerManager().Load(peerID) peer, loaded := v.resource.PeerManager().Load(peerID)
if !loaded { if !loaded {
options := []resource.PeerOption{resource.WithPriority(download.GetPriority()), resource.WithAnnouncePeerStream(stream)} options := []standard.PeerOption{standard.WithPriority(download.GetPriority()), standard.WithAnnouncePeerStream(stream)}
if download.GetRange() != nil { if download.GetRange() != nil {
options = append(options, resource.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())})) options = append(options, standard.WithRange(http.Range{Start: int64(download.Range.GetStart()), Length: int64(download.Range.GetLength())}))
} }
peer = resource.NewPeer(peerID, task, host, options...) peer = standard.NewPeer(peerID, task, host, options...)
v.resource.PeerManager().Store(peer) v.resource.PeerManager().Store(peer)
} }
@ -1313,7 +1319,7 @@ func (v *V2) handleResource(ctx context.Context, stream schedulerv2.Scheduler_An
} }
// downloadTaskBySeedPeer downloads task by seed peer. // downloadTaskBySeedPeer downloads task by seed peer.
func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download *commonv2.Download, peer *resource.Peer) error { func (v *V2) downloadTaskBySeedPeer(ctx context.Context, taskID string, download *commonv2.Download, peer *standard.Peer) error {
// Trigger the first download task based on different priority levels, // Trigger the first download task based on different priority levels,
// refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74. // refer to https://github.com/dragonflyoss/api/blob/main/pkg/apis/common/v2/common.proto#L74.
priority := peer.CalculatePriority(v.dynconfig) priority := peer.CalculatePriority(v.dynconfig)
@ -1393,10 +1399,103 @@ func (v *V2) AnnouncePersistentCachePeer(stream schedulerv2.Scheduler_AnnouncePe
return nil return nil
} }
// TODO Implement the following methods.
// StatPersistentCachePeer checks information of persistent cache peer. // StatPersistentCachePeer checks information of persistent cache peer.
func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatPersistentCachePeerRequest) (*commonv2.PersistentCachePeer, error) { func (v *V2) StatPersistentCachePeer(ctx context.Context, req *schedulerv2.StatPersistentCachePeerRequest) (*commonv2.PersistentCachePeer, error) {
return nil, nil peer, loaded := v.persistentCacheResource.PeerManager().Load(ctx, req.GetPeerId())
if !loaded {
return nil, status.Errorf(codes.NotFound, "persistent cache peer %s not found", req.GetPeerId())
}
return &commonv2.PersistentCachePeer{
Id: peer.ID,
Persistent: peer.Persistent,
State: peer.FSM.Current(),
Cost: durationpb.New(peer.Cost),
CreatedAt: timestamppb.New(peer.CreatedAt),
UpdatedAt: timestamppb.New(peer.UpdatedAt),
Task: &commonv2.PersistentCacheTask{
Id: peer.Task.ID,
PersistentReplicaCount: peer.Task.PersistentReplicaCount,
ReplicaCount: peer.Task.ReplicaCount,
Digest: peer.Task.Digest.String(),
Tag: &peer.Task.Tag,
Application: &peer.Task.Application,
PieceLength: uint64(peer.Task.PieceLength),
ContentLength: uint64(peer.Task.ContentLength),
PieceCount: uint32(peer.Task.TotalPieceCount),
State: peer.Task.FSM.Current(),
CreatedAt: timestamppb.New(peer.Task.CreatedAt),
UpdatedAt: timestamppb.New(peer.Task.UpdatedAt),
},
Host: &commonv2.Host{
Id: peer.Host.ID,
Type: uint32(peer.Host.Type),
Hostname: peer.Host.Hostname,
Ip: peer.Host.IP,
Port: peer.Host.Port,
DownloadPort: peer.Host.DownloadPort,
Os: peer.Host.OS,
Platform: peer.Host.Platform,
PlatformFamily: peer.Host.PlatformFamily,
PlatformVersion: peer.Host.PlatformVersion,
KernelVersion: peer.Host.KernelVersion,
Cpu: &commonv2.CPU{
LogicalCount: peer.Host.CPU.LogicalCount,
PhysicalCount: peer.Host.CPU.PhysicalCount,
Percent: peer.Host.CPU.Percent,
ProcessPercent: peer.Host.CPU.ProcessPercent,
Times: &commonv2.CPUTimes{
User: peer.Host.CPU.Times.User,
System: peer.Host.CPU.Times.System,
Idle: peer.Host.CPU.Times.Idle,
Nice: peer.Host.CPU.Times.Nice,
Iowait: peer.Host.CPU.Times.Iowait,
Irq: peer.Host.CPU.Times.Irq,
Softirq: peer.Host.CPU.Times.Softirq,
Steal: peer.Host.CPU.Times.Steal,
Guest: peer.Host.CPU.Times.Guest,
GuestNice: peer.Host.CPU.Times.GuestNice,
},
},
Memory: &commonv2.Memory{
Total: peer.Host.Memory.Total,
Available: peer.Host.Memory.Available,
Used: peer.Host.Memory.Used,
UsedPercent: peer.Host.Memory.UsedPercent,
ProcessUsedPercent: peer.Host.Memory.ProcessUsedPercent,
Free: peer.Host.Memory.Free,
},
Network: &commonv2.Network{
TcpConnectionCount: peer.Host.Network.TCPConnectionCount,
UploadTcpConnectionCount: peer.Host.Network.UploadTCPConnectionCount,
Location: &peer.Host.Network.Location,
Idc: &peer.Host.Network.IDC,
DownloadRate: peer.Host.Network.DownloadRate,
DownloadRateLimit: peer.Host.Network.DownloadRateLimit,
UploadRate: peer.Host.Network.UploadRate,
UploadRateLimit: peer.Host.Network.UploadRateLimit,
},
Disk: &commonv2.Disk{
Total: peer.Host.Disk.Total,
Free: peer.Host.Disk.Free,
Used: peer.Host.Disk.Used,
UsedPercent: peer.Host.Disk.UsedPercent,
InodesTotal: peer.Host.Disk.InodesTotal,
InodesUsed: peer.Host.Disk.InodesUsed,
InodesFree: peer.Host.Disk.InodesFree,
InodesUsedPercent: peer.Host.Disk.InodesUsedPercent,
},
Build: &commonv2.Build{
GitVersion: peer.Host.Build.GitVersion,
GitCommit: &peer.Host.Build.GitCommit,
GoVersion: &peer.Host.Build.GoVersion,
RustVersion: &peer.Host.Build.RustVersion,
Platform: &peer.Host.Build.Platform,
},
SchedulerClusterId: uint64(v.config.Manager.SchedulerClusterID),
DisableShared: peer.Host.DisableShared,
},
}, nil
} }
// TODO Implement the following methods. // TODO Implement the following methods.
@ -1423,10 +1522,27 @@ func (v *V2) UploadPersistentCacheTaskFailed(ctx context.Context, req *scheduler
return nil return nil
} }
// TODO Implement the following methods.
// StatPersistentCacheTask checks information of persistent cache task. // StatPersistentCacheTask checks information of persistent cache task.
func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatPersistentCacheTaskRequest) (*commonv2.PersistentCacheTask, error) { func (v *V2) StatPersistentCacheTask(ctx context.Context, req *schedulerv2.StatPersistentCacheTaskRequest) (*commonv2.PersistentCacheTask, error) {
return nil, nil task, loaded := v.persistentCacheResource.TaskManager().Load(ctx, req.GetTaskId())
if !loaded {
return nil, status.Errorf(codes.NotFound, "persistent cache task %s not found", req.GetTaskId())
}
return &commonv2.PersistentCacheTask{
Id: task.ID,
PersistentReplicaCount: task.PersistentReplicaCount,
ReplicaCount: task.ReplicaCount,
Digest: task.Digest.String(),
Tag: &task.Tag,
Application: &task.Application,
PieceLength: uint64(task.PieceLength),
ContentLength: uint64(task.ContentLength),
PieceCount: uint32(task.TotalPieceCount),
State: task.FSM.Current(),
CreatedAt: timestamppb.New(task.CreatedAt),
UpdatedAt: timestamppb.New(task.UpdatedAt),
}, nil
} }
// TODO Implement the following methods. // TODO Implement the following methods.

File diff suppressed because it is too large Load Diff