diff --git a/scheduler/config/config.go b/scheduler/config/config.go index 66957bde7..06e8c31a6 100644 --- a/scheduler/config/config.go +++ b/scheduler/config/config.go @@ -140,16 +140,19 @@ type TrainingConfig struct { } type GCConfig struct { - // Peer gc interval. + // PieceDownloadTimeout is timout of downloading piece. + PieceDownloadTimeout time.Duration `yaml:"pieceDownloadTimeout" mapstructure:"pieceDownloadTimeout"` + + // PeerGCInterval is interval of peer gc. PeerGCInterval time.Duration `yaml:"peerGCInterval" mapstructure:"peerGCInterval"` - // Peer time to live. + // PeerTTL is time to live of peer. PeerTTL time.Duration `yaml:"peerTTL" mapstructure:"peerTTL"` - // Task gc interval. + // TaskGCInterval is interval of task gc. TaskGCInterval time.Duration `yaml:"taskGCInterval" mapstructure:"taskGCInterval"` - // Host gc interval. + // HostGCInterval is interval of host gc. HostGCInterval time.Duration `yaml:"hostGCInterval" mapstructure:"hostGCInterval"` } @@ -298,10 +301,11 @@ func New() *Config { RetryLimit: DefaultSchedulerRetryLimit, RetryInterval: DefaultSchedulerRetryInterval, GC: GCConfig{ - PeerGCInterval: DefaultSchedulerPeerGCInterval, - PeerTTL: DefaultSchedulerPeerTTL, - TaskGCInterval: DefaultSchedulerTaskGCInterval, - HostGCInterval: DefaultSchedulerHostGCInterval, + PieceDownloadTimeout: DefaultSchedulerPieceDownloadTimeout, + PeerGCInterval: DefaultSchedulerPeerGCInterval, + PeerTTL: DefaultSchedulerPeerTTL, + TaskGCInterval: DefaultSchedulerTaskGCInterval, + HostGCInterval: DefaultSchedulerHostGCInterval, }, Training: TrainingConfig{ Enable: false, @@ -387,6 +391,10 @@ func (cfg *Config) Validate() error { return errors.New("scheduler requires parameter retryInterval") } + if cfg.Scheduler.GC.PieceDownloadTimeout <= 0 { + return errors.New("scheduler requires parameter pieceDownloadTimeout") + } + if cfg.Scheduler.GC.PeerTTL <= 0 { return errors.New("scheduler requires parameter peerTTL") } diff --git a/scheduler/config/config_test.go b/scheduler/config/config_test.go index 0fe0a5049..e76a88114 100644 --- a/scheduler/config/config_test.go +++ b/scheduler/config/config_test.go @@ -37,10 +37,11 @@ func TestConfig_Load(t *testing.T) { RetryLimit: 10, RetryInterval: 1 * time.Second, GC: GCConfig{ - PeerGCInterval: 1 * time.Minute, - PeerTTL: 5 * time.Minute, - TaskGCInterval: 1 * time.Minute, - HostGCInterval: 1 * time.Minute, + PieceDownloadTimeout: 1 * time.Minute, + PeerGCInterval: 1 * time.Minute, + PeerTTL: 5 * time.Minute, + TaskGCInterval: 1 * time.Minute, + HostGCInterval: 1 * time.Minute, }, Training: TrainingConfig{ Enable: true, diff --git a/scheduler/config/constants.go b/scheduler/config/constants.go index 2d4de0dd2..bff5ccc6a 100644 --- a/scheduler/config/constants.go +++ b/scheduler/config/constants.go @@ -58,6 +58,9 @@ const ( // DefaultSchedulerRetryInterval is default retry interval for scheduler. DefaultSchedulerRetryInterval = 50 * time.Millisecond + // DefaultSchedulerPieceDownloadTimeout is default timeout of downloading piece. + DefaultSchedulerPieceDownloadTimeout = 30 * time.Minute + // DefaultSchedulerPeerGCInterval is default interval for peer gc. DefaultSchedulerPeerGCInterval = 10 * time.Second diff --git a/scheduler/config/testdata/scheduler.yaml b/scheduler/config/testdata/scheduler.yaml index e5c70acc6..27b165c13 100644 --- a/scheduler/config/testdata/scheduler.yaml +++ b/scheduler/config/testdata/scheduler.yaml @@ -16,6 +16,7 @@ scheduler: retryLimit: 10 retryInterval: 1000000000 gc: + pieceDownloadTimeout: 60000000000 peerGCInterval: 60000000000 peerTTL: 300000000000 taskGCInterval: 60000000000 diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index 7e68e7cb0..637081fca 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -107,11 +107,11 @@ type Host struct { // PeerCount is peer count. PeerCount *atomic.Int32 - // CreateAt is host create time. - CreateAt *atomic.Time + // CreatedAt is host create time. + CreatedAt *atomic.Time - // UpdateAt is host update time. - UpdateAt *atomic.Time + // UpdatedAt is host update time. + UpdatedAt *atomic.Time // Host log. Log *logger.SugaredLoggerOnWith @@ -142,8 +142,8 @@ func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host UploadFailedCount: atomic.NewInt64(0), Peers: &sync.Map{}, PeerCount: atomic.NewInt32(0), - CreateAt: atomic.NewTime(time.Now()), - UpdateAt: atomic.NewTime(time.Now()), + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), Log: logger.WithHost(req.Id, req.Hostname, req.Ip), } diff --git a/scheduler/resource/host_test.go b/scheduler/resource/host_test.go index 6def4a274..3a117217d 100644 --- a/scheduler/resource/host_test.go +++ b/scheduler/resource/host_test.go @@ -85,8 +85,8 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Network.NetTopology, mockRawHost.Network.NetTopology) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.PeerCount.Load(), int32(0)) - assert.NotEqual(host.CreateAt.Load(), 0) - assert.NotEqual(host.UpdateAt.Load(), 0) + assert.NotEqual(host.CreatedAt.Load(), 0) + assert.NotEqual(host.UpdatedAt.Load(), 0) assert.NotNil(host.Log) }, }, @@ -107,8 +107,8 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Network.NetTopology, mockRawSeedHost.Network.NetTopology) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.PeerCount.Load(), int32(0)) - assert.NotEqual(host.CreateAt.Load(), 0) - assert.NotEqual(host.UpdateAt.Load(), 0) + assert.NotEqual(host.CreatedAt.Load(), 0) + assert.NotEqual(host.UpdatedAt.Load(), 0) assert.NotNil(host.Log) }, }, @@ -130,8 +130,8 @@ func TestHost_NewHost(t *testing.T) { assert.Equal(host.Network.NetTopology, mockRawHost.Network.NetTopology) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.PeerCount.Load(), int32(0)) - assert.NotEqual(host.CreateAt.Load(), 0) - assert.NotEqual(host.UpdateAt.Load(), 0) + assert.NotEqual(host.CreatedAt.Load(), 0) + assert.NotEqual(host.UpdatedAt.Load(), 0) assert.NotNil(host.Log) }, }, diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index fed5a80f1..2d3955a5c 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -174,11 +174,14 @@ type Peer struct { // IsBackToSource is set to true. IsBackToSource *atomic.Bool - // CreateAt is peer create time. - CreateAt *atomic.Time + // PieceUpdatedAt is piece update time. + PieceUpdatedAt *atomic.Time - // UpdateAt is peer update time. - UpdateAt *atomic.Time + // CreatedAt is peer create time. + CreatedAt *atomic.Time + + // UpdatedAt is peer update time. + UpdatedAt *atomic.Time // Peer log. Log *logger.SugaredLoggerOnWith @@ -199,8 +202,9 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { BlockParents: set.NewSafeSet[string](), NeedBackToSource: atomic.NewBool(false), IsBackToSource: atomic.NewBool(false), - CreateAt: atomic.NewTime(time.Now()), - UpdateAt: atomic.NewTime(time.Now()), + PieceUpdatedAt: atomic.NewTime(time.Now()), + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), Log: logger.WithPeer(host.ID, task.ID, id), } @@ -231,23 +235,23 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { }, fsm.Callbacks{ PeerEventRegisterEmpty: func(e *fsm.Event) { - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventRegisterTiny: func(e *fsm.Event) { - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventRegisterSmall: func(e *fsm.Event) { - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventRegisterNormal: func(e *fsm.Event) { - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventDownload: func(e *fsm.Event) { - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventDownloadBackToSource: func(e *fsm.Event) { @@ -258,7 +262,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { p.Log.Errorf("delete peer inedges failed: %s", err.Error()) } - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventDownloadSucceeded: func(e *fsm.Event) { @@ -271,7 +275,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { } p.Task.PeerFailedCount.Store(0) - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventDownloadFailed: func(e *fsm.Event) { @@ -284,7 +288,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { p.Log.Errorf("delete peer inedges failed: %s", err.Error()) } - p.UpdateAt.Store(time.Now()) + p.UpdatedAt.Store(time.Now()) p.Log.Infof("peer state is %s", e.FSM.Current()) }, PeerEventLeave: func(e *fsm.Event) { @@ -292,6 +296,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer { p.Log.Errorf("delete peer inedges failed: %s", err.Error()) } + p.Task.BackToSourcePeers.Delete(p.ID) p.Log.Infof("peer state is %s", e.FSM.Current()) }, }, diff --git a/scheduler/resource/peer_manager.go b/scheduler/resource/peer_manager.go index a2df33d2a..829a1be4e 100644 --- a/scheduler/resource/peer_manager.go +++ b/scheduler/resource/peer_manager.go @@ -54,19 +54,23 @@ type peerManager struct { // Peer sync map. *sync.Map - // Peer time to live. + // ttl is time to live of peer. ttl time.Duration - // Peer mutex. + // pieceDownloadTimeout is timeout of downloading piece. + pieceDownloadTimeout time.Duration + + // mu is peer mutex. mu *sync.Mutex } // New peer manager interface. func newPeerManager(cfg *config.GCConfig, gc pkggc.GC) (PeerManager, error) { p := &peerManager{ - Map: &sync.Map{}, - ttl: cfg.PeerTTL, - mu: &sync.Mutex{}, + Map: &sync.Map{}, + ttl: cfg.PeerTTL, + pieceDownloadTimeout: cfg.PieceDownloadTimeout, + mu: &sync.Mutex{}, } if err := gc.Add(pkggc.Task{ @@ -125,7 +129,11 @@ func (p *peerManager) Delete(key string) { func (p *peerManager) RunGC() error { p.Map.Range(func(_, value any) bool { - peer := value.(*Peer) + peer, ok := value.(*Peer) + if !ok { + peer.Log.Warn("invalid peer") + return true + } // If the peer state is PeerStateLeave, // peer will be reclaimed. @@ -135,12 +143,25 @@ func (p *peerManager) RunGC() error { return true } + // If the peer's elapsed of downloading piece exceeds the pieceDownloadTimeout, + // then sets the peer state to PeerStateLeave and then delete peer. + if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateBackToSource) { + elapsed := time.Since(peer.PieceUpdatedAt.Load()) + if elapsed > p.pieceDownloadTimeout { + peer.Log.Info("peer elapsed exceeds the timeout of downloading piece, causing the peer to leave") + if err := peer.FSM.Event(PeerEventLeave); err != nil { + peer.Log.Errorf("peer fsm event failed: %s", err.Error()) + return true + } + + return true + } + } + // If the peer's elapsed exceeds the ttl, - // first set the peer state to PeerStateLeave and then delete peer. - elapsed := time.Since(peer.UpdateAt.Load()) + // then set the peer state to PeerStateLeave and then delete peer. + elapsed := time.Since(peer.UpdatedAt.Load()) if elapsed > p.ttl { - // If the peer is not leave, - // first change the state to PeerEventLeave. peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave") if err := peer.FSM.Event(PeerEventLeave); err != nil { peer.Log.Errorf("peer fsm event failed: %s", err.Error()) @@ -151,7 +172,7 @@ func (p *peerManager) RunGC() error { } // If the peer's state is PeerStateFailed, - // first set the peer state to PeerStateLeave and then delete peer. + // then set the peer state to PeerStateLeave and then delete peer. if peer.FSM.Is(PeerStateFailed) { peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave") if err := peer.FSM.Event(PeerEventLeave); err != nil { diff --git a/scheduler/resource/peer_manager_test.go b/scheduler/resource/peer_manager_test.go index 6eef259ae..a8983c703 100644 --- a/scheduler/resource/peer_manager_test.go +++ b/scheduler/resource/peer_manager_test.go @@ -319,8 +319,9 @@ func TestPeerManager_RunGC(t *testing.T) { { name: "peer leave", gcConfig: &config.GCConfig{ - PeerGCInterval: 1 * time.Second, - PeerTTL: 1 * time.Microsecond, + PieceDownloadTimeout: 5 * time.Minute, + PeerGCInterval: 1 * time.Second, + PeerTTL: 1 * time.Microsecond, }, mock: func(m *gc.MockGCMockRecorder) { m.Add(gomock.Any()).Return(nil).Times(1) @@ -337,11 +338,68 @@ func TestPeerManager_RunGC(t *testing.T) { assert.Equal(peer.FSM.Current(), PeerStateLeave) }, }, + { + name: "peer download piece timeout and peer state is PeerStateRunning", + gcConfig: &config.GCConfig{ + PieceDownloadTimeout: 1 * time.Microsecond, + PeerGCInterval: 1 * time.Second, + PeerTTL: 5 * time.Minute, + }, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) { + assert := assert.New(t) + peerManager.Store(mockPeer) + mockPeer.FSM.SetState(PeerStateRunning) + err := peerManager.RunGC() + assert.NoError(err) + + peer, ok := peerManager.Load(mockPeer.ID) + assert.Equal(ok, true) + assert.Equal(peer.FSM.Current(), PeerStateLeave) + + err = peerManager.RunGC() + assert.NoError(err) + + _, ok = peerManager.Load(mockPeer.ID) + assert.Equal(ok, false) + }, + }, + { + name: "peer download piece timeout and peer state is PeerStateBackToSource", + gcConfig: &config.GCConfig{ + PieceDownloadTimeout: 1 * time.Microsecond, + PeerGCInterval: 1 * time.Second, + PeerTTL: 5 * time.Minute, + }, + mock: func(m *gc.MockGCMockRecorder) { + m.Add(gomock.Any()).Return(nil).Times(1) + }, + expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) { + assert := assert.New(t) + peerManager.Store(mockPeer) + mockPeer.FSM.SetState(PeerStateBackToSource) + err := peerManager.RunGC() + assert.NoError(err) + + peer, ok := peerManager.Load(mockPeer.ID) + assert.Equal(ok, true) + assert.Equal(peer.FSM.Current(), PeerStateLeave) + + err = peerManager.RunGC() + assert.NoError(err) + + _, ok = peerManager.Load(mockPeer.ID) + assert.Equal(ok, false) + }, + }, { name: "peer reclaimed", gcConfig: &config.GCConfig{ - PeerGCInterval: 1 * time.Second, - PeerTTL: 1 * time.Microsecond, + PieceDownloadTimeout: 5 * time.Minute, + PeerGCInterval: 1 * time.Second, + PeerTTL: 1 * time.Microsecond, }, mock: func(m *gc.MockGCMockRecorder) { m.Add(gomock.Any()).Return(nil).Times(1) @@ -367,8 +425,9 @@ func TestPeerManager_RunGC(t *testing.T) { { name: "peer state is PeerStateFailed", gcConfig: &config.GCConfig{ - PeerGCInterval: 1 * time.Second, - PeerTTL: 1 * time.Microsecond, + PieceDownloadTimeout: 5 * time.Minute, + PeerGCInterval: 1 * time.Second, + PeerTTL: 1 * time.Microsecond, }, mock: func(m *gc.MockGCMockRecorder) { m.Add(gomock.Any()).Return(nil).Times(1) @@ -388,8 +447,9 @@ func TestPeerManager_RunGC(t *testing.T) { { name: "peer gets degree failed", gcConfig: &config.GCConfig{ - PeerGCInterval: 1 * time.Second, - PeerTTL: 1 * time.Hour, + PieceDownloadTimeout: 5 * time.Minute, + PeerGCInterval: 1 * time.Second, + PeerTTL: 1 * time.Hour, }, mock: func(m *gc.MockGCMockRecorder) { m.Add(gomock.Any()).Return(nil).Times(1) @@ -410,8 +470,9 @@ func TestPeerManager_RunGC(t *testing.T) { { name: "peer reclaimed with PeerCountLimitForTask", gcConfig: &config.GCConfig{ - PeerGCInterval: 1 * time.Second, - PeerTTL: 1 * time.Hour, + PieceDownloadTimeout: 5 * time.Minute, + PeerGCInterval: 1 * time.Second, + PeerTTL: 1 * time.Hour, }, mock: func(m *gc.MockGCMockRecorder) { m.Add(gomock.Any()).Return(nil).Times(1) diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 8e1f2c76d..e8c42b68c 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -62,8 +62,8 @@ func TestPeer_NewPeer(t *testing.T) { assert.Equal(peer.FSM.Current(), PeerStatePending) assert.EqualValues(peer.Task, mockTask) assert.EqualValues(peer.Host, mockHost) - assert.NotEqual(peer.CreateAt.Load(), 0) - assert.NotEqual(peer.UpdateAt.Load(), 0) + assert.NotEqual(peer.CreatedAt.Load(), 0) + assert.NotEqual(peer.UpdatedAt.Load(), 0) assert.NotNil(peer.Log) }, }, @@ -83,8 +83,8 @@ func TestPeer_NewPeer(t *testing.T) { assert.Equal(peer.FSM.Current(), PeerStatePending) assert.EqualValues(peer.Task, mockTask) assert.EqualValues(peer.Host, mockHost) - assert.NotEqual(peer.CreateAt.Load(), 0) - assert.NotEqual(peer.UpdateAt.Load(), 0) + assert.NotEqual(peer.CreatedAt.Load(), 0) + assert.NotEqual(peer.UpdatedAt.Load(), 0) assert.NotNil(peer.Log) }, }, diff --git a/scheduler/resource/seed_peer.go b/scheduler/resource/seed_peer.go index d4feda565..5cce332e7 100644 --- a/scheduler/resource/seed_peer.go +++ b/scheduler/resource/seed_peer.go @@ -142,9 +142,10 @@ func (s *seedPeer) TriggerTask(ctx context.Context, task *Task) (*Peer, *schedul peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) // When the piece is downloaded successfully, - // peer.UpdateAt needs to be updated to prevent + // peer.UpdatedAt needs to be updated to prevent // the peer from being GC during the download process. - peer.UpdateAt.Store(time.Now()) + peer.UpdatedAt.Store(time.Now()) + peer.PieceUpdatedAt.Store(time.Now()) task.StorePiece(piece.PieceInfo) // Statistical traffic metrics. diff --git a/scheduler/resource/task.go b/scheduler/resource/task.go index 4ad8e6903..e17b6d3bf 100644 --- a/scheduler/resource/task.go +++ b/scheduler/resource/task.go @@ -132,11 +132,11 @@ type Task struct { // if one peer succeeds, the value is reset to zero. PeerFailedCount *atomic.Int32 - // CreateAt is task create time. - CreateAt *atomic.Time + // CreatedAt is task create time. + CreatedAt *atomic.Time - // UpdateAt is task update time. - UpdateAt *atomic.Time + // UpdatedAt is task update time. + UpdatedAt *atomic.Time // Task log. Log *logger.SugaredLoggerOnWith @@ -157,8 +157,8 @@ func NewTask(id, url string, taskType commonv1.TaskType, meta *commonv1.UrlMeta, Pieces: &sync.Map{}, DAG: dag.NewDAG[*Peer](), PeerFailedCount: atomic.NewInt32(0), - CreateAt: atomic.NewTime(time.Now()), - UpdateAt: atomic.NewTime(time.Now()), + CreatedAt: atomic.NewTime(time.Now()), + UpdatedAt: atomic.NewTime(time.Now()), Log: logger.WithTask(id, url), } @@ -173,19 +173,19 @@ func NewTask(id, url string, taskType commonv1.TaskType, meta *commonv1.UrlMeta, }, fsm.Callbacks{ TaskEventDownload: func(e *fsm.Event) { - t.UpdateAt.Store(time.Now()) + t.UpdatedAt.Store(time.Now()) t.Log.Infof("task state is %s", e.FSM.Current()) }, TaskEventDownloadSucceeded: func(e *fsm.Event) { - t.UpdateAt.Store(time.Now()) + t.UpdatedAt.Store(time.Now()) t.Log.Infof("task state is %s", e.FSM.Current()) }, TaskEventDownloadFailed: func(e *fsm.Event) { - t.UpdateAt.Store(time.Now()) + t.UpdatedAt.Store(time.Now()) t.Log.Infof("task state is %s", e.FSM.Current()) }, TaskEventLeave: func(e *fsm.Event) { - t.UpdateAt.Store(time.Now()) + t.UpdatedAt.Store(time.Now()) t.Log.Infof("task state is %s", e.FSM.Current()) }, }, @@ -362,7 +362,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) { sort.Slice( peers, func(i, j int) bool { - return peers[i].UpdateAt.Load().After(peers[j].UpdateAt.Load()) + return peers[i].UpdatedAt.Load().After(peers[j].UpdatedAt.Load()) }, ) @@ -376,7 +376,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) { // IsSeedPeerFailed returns whether the seed peer in the task failed. func (t *Task) IsSeedPeerFailed() bool { seedPeer, ok := t.LoadSeedPeer() - return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreateAt.Load()) < SeedPeerFailedTimeout + return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreatedAt.Load()) < SeedPeerFailedTimeout } // LoadPiece return piece for a key. diff --git a/scheduler/resource/task_test.go b/scheduler/resource/task_test.go index c4cf04c0e..5fa09d69d 100644 --- a/scheduler/resource/task_test.go +++ b/scheduler/resource/task_test.go @@ -82,8 +82,8 @@ func TestTask_NewTask(t *testing.T) { assert.Equal(task.FSM.Current(), TaskStatePending) assert.Empty(task.Pieces) assert.Equal(task.PeerCount(), 0) - assert.NotEqual(task.CreateAt.Load(), 0) - assert.NotEqual(task.UpdateAt.Load(), 0) + assert.NotEqual(task.CreatedAt.Load(), 0) + assert.NotEqual(task.UpdatedAt.Load(), 0) assert.NotNil(task.Log) }, }, @@ -1042,8 +1042,8 @@ func TestTask_LoadSeedPeer(t *testing.T) { task.StorePeer(mockPeer) task.StorePeer(mockSeedPeer) - mockPeer.UpdateAt.Store(time.Now()) - mockSeedPeer.UpdateAt.Store(time.Now().Add(1 * time.Second)) + mockPeer.UpdatedAt.Store(time.Now()) + mockSeedPeer.UpdatedAt.Store(time.Now().Add(1 * time.Second)) peer, ok := task.LoadSeedPeer() assert.True(ok) @@ -1124,7 +1124,7 @@ func TestTask_IsSeedPeerFailed(t *testing.T) { assert := assert.New(t) task.StorePeer(mockPeer) task.StorePeer(mockSeedPeer) - mockSeedPeer.CreateAt.Store(time.Now().Add(-SeedPeerFailedTimeout)) + mockSeedPeer.CreatedAt.Store(time.Now().Add(-SeedPeerFailedTimeout)) mockSeedPeer.FSM.SetState(PeerStateFailed) assert.False(task.IsSeedPeerFailed()) diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 4e7dd1a29..e91d1f4a1 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -744,16 +744,17 @@ func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, p peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) // When the piece is downloaded successfully, - // peer's UpdateAt needs to be updated + // peer's UpdatedAt needs to be updated // to prevent the peer from being GC during the download process. - peer.UpdateAt.Store(time.Now()) + peer.UpdatedAt.Store(time.Now()) + peer.PieceUpdatedAt.Store(time.Now()) // When the piece is downloaded successfully, - // dst peer's UpdateAt needs to be updated + // dst peer's UpdatedAt needs to be updated // to prevent the dst peer from being GC during the download process. if !resource.IsPieceBackToSource(piece) { if destPeer, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded { - destPeer.UpdateAt.Store(time.Now()) + destPeer.UpdatedAt.Store(time.Now()) } } @@ -997,8 +998,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re Application: parent.Application, State: parent.FSM.Current(), Cost: req.Cost, - CreateAt: parent.CreateAt.Load().UnixNano(), - UpdateAt: parent.UpdateAt.Load().UnixNano(), + CreatedAt: parent.CreatedAt.Load().UnixNano(), + UpdatedAt: parent.UpdatedAt.Load().UnixNano(), Host: storage.Host{ ID: parent.Host.ID, Type: parent.Host.Type.Name(), @@ -1015,8 +1016,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re ConcurrentUploadCount: parent.Host.ConcurrentUploadCount.Load(), UploadCount: parent.Host.UploadCount.Load(), UploadFailedCount: parent.Host.UploadFailedCount.Load(), - CreateAt: parent.Host.CreateAt.Load().UnixNano(), - UpdateAt: parent.Host.UpdateAt.Load().UnixNano(), + CreatedAt: parent.Host.CreatedAt.Load().UnixNano(), + UpdatedAt: parent.Host.UpdatedAt.Load().UnixNano(), }, } @@ -1095,8 +1096,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re State: peer.FSM.Current(), Cost: req.Cost, Parents: parentRecords, - CreateAt: peer.CreateAt.Load().UnixNano(), - UpdateAt: peer.UpdateAt.Load().UnixNano(), + CreatedAt: peer.CreatedAt.Load().UnixNano(), + UpdatedAt: peer.UpdatedAt.Load().UnixNano(), Task: storage.Task{ ID: peer.Task.ID, URL: peer.Task.URL, @@ -1106,8 +1107,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re BackToSourceLimit: peer.Task.BackToSourceLimit.Load(), BackToSourcePeerCount: int32(peer.Task.BackToSourcePeers.Len()), State: peer.Task.FSM.Current(), - CreateAt: peer.Task.CreateAt.Load().UnixNano(), - UpdateAt: peer.Task.UpdateAt.Load().UnixNano(), + CreatedAt: peer.Task.CreatedAt.Load().UnixNano(), + UpdatedAt: peer.Task.UpdatedAt.Load().UnixNano(), }, Host: storage.Host{ ID: peer.Host.ID, @@ -1125,8 +1126,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re ConcurrentUploadCount: peer.Host.ConcurrentUploadCount.Load(), UploadCount: peer.Host.UploadCount.Load(), UploadFailedCount: peer.Host.UploadFailedCount.Load(), - CreateAt: peer.Host.CreateAt.Load().UnixNano(), - UpdateAt: peer.Host.UpdateAt.Load().UnixNano(), + CreatedAt: peer.Host.CreatedAt.Load().UnixNano(), + UpdatedAt: peer.Host.UpdatedAt.Load().UnixNano(), }, } diff --git a/scheduler/storage/storage_test.go b/scheduler/storage/storage_test.go index 7bf8b478d..41f1ca029 100644 --- a/scheduler/storage/storage_test.go +++ b/scheduler/storage/storage_test.go @@ -43,8 +43,8 @@ var ( BackToSourceLimit: 10, BackToSourcePeerCount: 2, State: "Succeeded", - CreateAt: time.Now().UnixNano(), - UpdateAt: time.Now().UnixNano(), + CreatedAt: time.Now().UnixNano(), + UpdatedAt: time.Now().UnixNano(), } mockHost = Host{ @@ -113,8 +113,8 @@ var ( GoVersion: "1.19", Platform: "linux", }, - CreateAt: time.Now().UnixNano(), - UpdateAt: time.Now().UnixNano(), + CreatedAt: time.Now().UnixNano(), + UpdatedAt: time.Now().UnixNano(), } mockParent = Parent{ @@ -124,8 +124,8 @@ var ( State: "Succeeded", Cost: 1000, Host: mockHost, - CreateAt: time.Now().UnixNano(), - UpdateAt: time.Now().UnixNano(), + CreatedAt: time.Now().UnixNano(), + UpdatedAt: time.Now().UnixNano(), } mockParents = append(make([]Parent, 19), mockParent) @@ -139,8 +139,8 @@ var ( Task: mockTask, Host: mockHost, Parents: mockParents, - CreateAt: time.Now().UnixNano(), - UpdateAt: time.Now().UnixNano(), + CreatedAt: time.Now().UnixNano(), + UpdatedAt: time.Now().UnixNano(), } ) diff --git a/scheduler/storage/types.go b/scheduler/storage/types.go index ed60a9047..37aafd8e8 100644 --- a/scheduler/storage/types.go +++ b/scheduler/storage/types.go @@ -42,11 +42,11 @@ type Task struct { // State is the download state of the task. State string `csv:"state"` - // CreateAt is peer create nanosecond time. - CreateAt int64 `csv:"createAt"` + // CreatedAt is peer create nanosecond time. + CreatedAt int64 `csv:"createdAt"` - // UpdateAt is peer update nanosecond time. - UpdateAt int64 `csv:"updateAt"` + // UpdatedAt is peer update nanosecond time. + UpdatedAt int64 `csv:"updatedAt"` } // Host contains content for host. @@ -111,11 +111,11 @@ type Host struct { // Build information. Build Build `csv:"build"` - // CreateAt is peer create nanosecond time. - CreateAt int64 `csv:"createAt"` + // CreatedAt is peer create nanosecond time. + CreatedAt int64 `csv:"createdAt"` - // UpdateAt is peer update nanosecond time. - UpdateAt int64 `csv:"updateAt"` + // UpdatedAt is peer update nanosecond time. + UpdatedAt int64 `csv:"updatedAt"` } // CPU contains content for cpu. @@ -273,11 +273,11 @@ type Parent struct { // Host is peer host. Host Host `csv:"host"` - // CreateAt is peer create nanosecond time. - CreateAt int64 `csv:"createAt"` + // CreatedAt is peer create nanosecond time. + CreatedAt int64 `csv:"createdAt"` - // UpdateAt is peer update nanosecond time. - UpdateAt int64 `csv:"updateAt"` + // UpdatedAt is peer update nanosecond time. + UpdatedAt int64 `csv:"updatedAt"` } // Record contains content for record. @@ -306,9 +306,9 @@ type Record struct { // Parents is peer parents. Parents []Parent `csv:"parents" csv[]:"20"` - // CreateAt is peer create nanosecond time. - CreateAt int64 `csv:"createAt"` + // CreatedAt is peer create nanosecond time. + CreatedAt int64 `csv:"createdAt"` - // UpdateAt is peer update nanosecond time. - UpdateAt int64 `csv:"updateAt"` + // UpdatedAt is peer update nanosecond time. + UpdatedAt int64 `csv:"updatedAt"` }