feat: add the timeout of downloading piece to scheduler (#1880)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-12-01 14:43:23 +08:00
parent 10ae70db79
commit 8749814dee
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
16 changed files with 222 additions and 120 deletions

View File

@ -140,16 +140,19 @@ type TrainingConfig struct {
} }
type GCConfig struct { type GCConfig struct {
// Peer gc interval. // PieceDownloadTimeout is timout of downloading piece.
PieceDownloadTimeout time.Duration `yaml:"pieceDownloadTimeout" mapstructure:"pieceDownloadTimeout"`
// PeerGCInterval is interval of peer gc.
PeerGCInterval time.Duration `yaml:"peerGCInterval" mapstructure:"peerGCInterval"` PeerGCInterval time.Duration `yaml:"peerGCInterval" mapstructure:"peerGCInterval"`
// Peer time to live. // PeerTTL is time to live of peer.
PeerTTL time.Duration `yaml:"peerTTL" mapstructure:"peerTTL"` PeerTTL time.Duration `yaml:"peerTTL" mapstructure:"peerTTL"`
// Task gc interval. // TaskGCInterval is interval of task gc.
TaskGCInterval time.Duration `yaml:"taskGCInterval" mapstructure:"taskGCInterval"` TaskGCInterval time.Duration `yaml:"taskGCInterval" mapstructure:"taskGCInterval"`
// Host gc interval. // HostGCInterval is interval of host gc.
HostGCInterval time.Duration `yaml:"hostGCInterval" mapstructure:"hostGCInterval"` HostGCInterval time.Duration `yaml:"hostGCInterval" mapstructure:"hostGCInterval"`
} }
@ -298,6 +301,7 @@ func New() *Config {
RetryLimit: DefaultSchedulerRetryLimit, RetryLimit: DefaultSchedulerRetryLimit,
RetryInterval: DefaultSchedulerRetryInterval, RetryInterval: DefaultSchedulerRetryInterval,
GC: GCConfig{ GC: GCConfig{
PieceDownloadTimeout: DefaultSchedulerPieceDownloadTimeout,
PeerGCInterval: DefaultSchedulerPeerGCInterval, PeerGCInterval: DefaultSchedulerPeerGCInterval,
PeerTTL: DefaultSchedulerPeerTTL, PeerTTL: DefaultSchedulerPeerTTL,
TaskGCInterval: DefaultSchedulerTaskGCInterval, TaskGCInterval: DefaultSchedulerTaskGCInterval,
@ -387,6 +391,10 @@ func (cfg *Config) Validate() error {
return errors.New("scheduler requires parameter retryInterval") return errors.New("scheduler requires parameter retryInterval")
} }
if cfg.Scheduler.GC.PieceDownloadTimeout <= 0 {
return errors.New("scheduler requires parameter pieceDownloadTimeout")
}
if cfg.Scheduler.GC.PeerTTL <= 0 { if cfg.Scheduler.GC.PeerTTL <= 0 {
return errors.New("scheduler requires parameter peerTTL") return errors.New("scheduler requires parameter peerTTL")
} }

View File

@ -37,6 +37,7 @@ func TestConfig_Load(t *testing.T) {
RetryLimit: 10, RetryLimit: 10,
RetryInterval: 1 * time.Second, RetryInterval: 1 * time.Second,
GC: GCConfig{ GC: GCConfig{
PieceDownloadTimeout: 1 * time.Minute,
PeerGCInterval: 1 * time.Minute, PeerGCInterval: 1 * time.Minute,
PeerTTL: 5 * time.Minute, PeerTTL: 5 * time.Minute,
TaskGCInterval: 1 * time.Minute, TaskGCInterval: 1 * time.Minute,

View File

@ -58,6 +58,9 @@ const (
// DefaultSchedulerRetryInterval is default retry interval for scheduler. // DefaultSchedulerRetryInterval is default retry interval for scheduler.
DefaultSchedulerRetryInterval = 50 * time.Millisecond DefaultSchedulerRetryInterval = 50 * time.Millisecond
// DefaultSchedulerPieceDownloadTimeout is default timeout of downloading piece.
DefaultSchedulerPieceDownloadTimeout = 30 * time.Minute
// DefaultSchedulerPeerGCInterval is default interval for peer gc. // DefaultSchedulerPeerGCInterval is default interval for peer gc.
DefaultSchedulerPeerGCInterval = 10 * time.Second DefaultSchedulerPeerGCInterval = 10 * time.Second

View File

@ -16,6 +16,7 @@ scheduler:
retryLimit: 10 retryLimit: 10
retryInterval: 1000000000 retryInterval: 1000000000
gc: gc:
pieceDownloadTimeout: 60000000000
peerGCInterval: 60000000000 peerGCInterval: 60000000000
peerTTL: 300000000000 peerTTL: 300000000000
taskGCInterval: 60000000000 taskGCInterval: 60000000000

View File

@ -107,11 +107,11 @@ type Host struct {
// PeerCount is peer count. // PeerCount is peer count.
PeerCount *atomic.Int32 PeerCount *atomic.Int32
// CreateAt is host create time. // CreatedAt is host create time.
CreateAt *atomic.Time CreatedAt *atomic.Time
// UpdateAt is host update time. // UpdatedAt is host update time.
UpdateAt *atomic.Time UpdatedAt *atomic.Time
// Host log. // Host log.
Log *logger.SugaredLoggerOnWith Log *logger.SugaredLoggerOnWith
@ -142,8 +142,8 @@ func NewHost(req *schedulerv1.AnnounceHostRequest, options ...HostOption) *Host
UploadFailedCount: atomic.NewInt64(0), UploadFailedCount: atomic.NewInt64(0),
Peers: &sync.Map{}, Peers: &sync.Map{},
PeerCount: atomic.NewInt32(0), PeerCount: atomic.NewInt32(0),
CreateAt: atomic.NewTime(time.Now()), CreatedAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()), UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithHost(req.Id, req.Hostname, req.Ip), Log: logger.WithHost(req.Id, req.Hostname, req.Ip),
} }

View File

@ -85,8 +85,8 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Network.NetTopology, mockRawHost.Network.NetTopology) assert.Equal(host.Network.NetTopology, mockRawHost.Network.NetTopology)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.PeerCount.Load(), int32(0)) assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEqual(host.CreateAt.Load(), 0) assert.NotEqual(host.CreatedAt.Load(), 0)
assert.NotEqual(host.UpdateAt.Load(), 0) assert.NotEqual(host.UpdatedAt.Load(), 0)
assert.NotNil(host.Log) assert.NotNil(host.Log)
}, },
}, },
@ -107,8 +107,8 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Network.NetTopology, mockRawSeedHost.Network.NetTopology) assert.Equal(host.Network.NetTopology, mockRawSeedHost.Network.NetTopology)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(config.DefaultPeerConcurrentUploadLimit))
assert.Equal(host.PeerCount.Load(), int32(0)) assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEqual(host.CreateAt.Load(), 0) assert.NotEqual(host.CreatedAt.Load(), 0)
assert.NotEqual(host.UpdateAt.Load(), 0) assert.NotEqual(host.UpdatedAt.Load(), 0)
assert.NotNil(host.Log) assert.NotNil(host.Log)
}, },
}, },
@ -130,8 +130,8 @@ func TestHost_NewHost(t *testing.T) {
assert.Equal(host.Network.NetTopology, mockRawHost.Network.NetTopology) assert.Equal(host.Network.NetTopology, mockRawHost.Network.NetTopology)
assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200)) assert.Equal(host.ConcurrentUploadLimit.Load(), int32(200))
assert.Equal(host.PeerCount.Load(), int32(0)) assert.Equal(host.PeerCount.Load(), int32(0))
assert.NotEqual(host.CreateAt.Load(), 0) assert.NotEqual(host.CreatedAt.Load(), 0)
assert.NotEqual(host.UpdateAt.Load(), 0) assert.NotEqual(host.UpdatedAt.Load(), 0)
assert.NotNil(host.Log) assert.NotNil(host.Log)
}, },
}, },

View File

@ -174,11 +174,14 @@ type Peer struct {
// IsBackToSource is set to true. // IsBackToSource is set to true.
IsBackToSource *atomic.Bool IsBackToSource *atomic.Bool
// CreateAt is peer create time. // PieceUpdatedAt is piece update time.
CreateAt *atomic.Time PieceUpdatedAt *atomic.Time
// UpdateAt is peer update time. // CreatedAt is peer create time.
UpdateAt *atomic.Time CreatedAt *atomic.Time
// UpdatedAt is peer update time.
UpdatedAt *atomic.Time
// Peer log. // Peer log.
Log *logger.SugaredLoggerOnWith Log *logger.SugaredLoggerOnWith
@ -199,8 +202,9 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
BlockParents: set.NewSafeSet[string](), BlockParents: set.NewSafeSet[string](),
NeedBackToSource: atomic.NewBool(false), NeedBackToSource: atomic.NewBool(false),
IsBackToSource: atomic.NewBool(false), IsBackToSource: atomic.NewBool(false),
CreateAt: atomic.NewTime(time.Now()), PieceUpdatedAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()), CreatedAt: atomic.NewTime(time.Now()),
UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithPeer(host.ID, task.ID, id), Log: logger.WithPeer(host.ID, task.ID, id),
} }
@ -231,23 +235,23 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
}, },
fsm.Callbacks{ fsm.Callbacks{
PeerEventRegisterEmpty: func(e *fsm.Event) { PeerEventRegisterEmpty: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventRegisterTiny: func(e *fsm.Event) { PeerEventRegisterTiny: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventRegisterSmall: func(e *fsm.Event) { PeerEventRegisterSmall: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventRegisterNormal: func(e *fsm.Event) { PeerEventRegisterNormal: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventDownload: func(e *fsm.Event) { PeerEventDownload: func(e *fsm.Event) {
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventDownloadBackToSource: func(e *fsm.Event) { PeerEventDownloadBackToSource: func(e *fsm.Event) {
@ -258,7 +262,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p.Log.Errorf("delete peer inedges failed: %s", err.Error()) p.Log.Errorf("delete peer inedges failed: %s", err.Error())
} }
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventDownloadSucceeded: func(e *fsm.Event) { PeerEventDownloadSucceeded: func(e *fsm.Event) {
@ -271,7 +275,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
} }
p.Task.PeerFailedCount.Store(0) p.Task.PeerFailedCount.Store(0)
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventDownloadFailed: func(e *fsm.Event) { PeerEventDownloadFailed: func(e *fsm.Event) {
@ -284,7 +288,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p.Log.Errorf("delete peer inedges failed: %s", err.Error()) p.Log.Errorf("delete peer inedges failed: %s", err.Error())
} }
p.UpdateAt.Store(time.Now()) p.UpdatedAt.Store(time.Now())
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
PeerEventLeave: func(e *fsm.Event) { PeerEventLeave: func(e *fsm.Event) {
@ -292,6 +296,7 @@ func NewPeer(id string, task *Task, host *Host, options ...PeerOption) *Peer {
p.Log.Errorf("delete peer inedges failed: %s", err.Error()) p.Log.Errorf("delete peer inedges failed: %s", err.Error())
} }
p.Task.BackToSourcePeers.Delete(p.ID)
p.Log.Infof("peer state is %s", e.FSM.Current()) p.Log.Infof("peer state is %s", e.FSM.Current())
}, },
}, },

View File

@ -54,10 +54,13 @@ type peerManager struct {
// Peer sync map. // Peer sync map.
*sync.Map *sync.Map
// Peer time to live. // ttl is time to live of peer.
ttl time.Duration ttl time.Duration
// Peer mutex. // pieceDownloadTimeout is timeout of downloading piece.
pieceDownloadTimeout time.Duration
// mu is peer mutex.
mu *sync.Mutex mu *sync.Mutex
} }
@ -66,6 +69,7 @@ func newPeerManager(cfg *config.GCConfig, gc pkggc.GC) (PeerManager, error) {
p := &peerManager{ p := &peerManager{
Map: &sync.Map{}, Map: &sync.Map{},
ttl: cfg.PeerTTL, ttl: cfg.PeerTTL,
pieceDownloadTimeout: cfg.PieceDownloadTimeout,
mu: &sync.Mutex{}, mu: &sync.Mutex{},
} }
@ -125,7 +129,11 @@ func (p *peerManager) Delete(key string) {
func (p *peerManager) RunGC() error { func (p *peerManager) RunGC() error {
p.Map.Range(func(_, value any) bool { p.Map.Range(func(_, value any) bool {
peer := value.(*Peer) peer, ok := value.(*Peer)
if !ok {
peer.Log.Warn("invalid peer")
return true
}
// If the peer state is PeerStateLeave, // If the peer state is PeerStateLeave,
// peer will be reclaimed. // peer will be reclaimed.
@ -135,12 +143,25 @@ func (p *peerManager) RunGC() error {
return true return true
} }
// If the peer's elapsed of downloading piece exceeds the pieceDownloadTimeout,
// then sets the peer state to PeerStateLeave and then delete peer.
if peer.FSM.Is(PeerStateRunning) || peer.FSM.Is(PeerStateBackToSource) {
elapsed := time.Since(peer.PieceUpdatedAt.Load())
if elapsed > p.pieceDownloadTimeout {
peer.Log.Info("peer elapsed exceeds the timeout of downloading piece, causing the peer to leave")
if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}
return true
}
}
// If the peer's elapsed exceeds the ttl, // If the peer's elapsed exceeds the ttl,
// first set the peer state to PeerStateLeave and then delete peer. // then set the peer state to PeerStateLeave and then delete peer.
elapsed := time.Since(peer.UpdateAt.Load()) elapsed := time.Since(peer.UpdatedAt.Load())
if elapsed > p.ttl { if elapsed > p.ttl {
// If the peer is not leave,
// first change the state to PeerEventLeave.
peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave") peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave")
if err := peer.FSM.Event(PeerEventLeave); err != nil { if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error()) peer.Log.Errorf("peer fsm event failed: %s", err.Error())
@ -151,7 +172,7 @@ func (p *peerManager) RunGC() error {
} }
// If the peer's state is PeerStateFailed, // If the peer's state is PeerStateFailed,
// first set the peer state to PeerStateLeave and then delete peer. // then set the peer state to PeerStateLeave and then delete peer.
if peer.FSM.Is(PeerStateFailed) { if peer.FSM.Is(PeerStateFailed) {
peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave") peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave")
if err := peer.FSM.Event(PeerEventLeave); err != nil { if err := peer.FSM.Event(PeerEventLeave); err != nil {

View File

@ -319,6 +319,7 @@ func TestPeerManager_RunGC(t *testing.T) {
{ {
name: "peer leave", name: "peer leave",
gcConfig: &config.GCConfig{ gcConfig: &config.GCConfig{
PieceDownloadTimeout: 5 * time.Minute,
PeerGCInterval: 1 * time.Second, PeerGCInterval: 1 * time.Second,
PeerTTL: 1 * time.Microsecond, PeerTTL: 1 * time.Microsecond,
}, },
@ -337,9 +338,66 @@ func TestPeerManager_RunGC(t *testing.T) {
assert.Equal(peer.FSM.Current(), PeerStateLeave) assert.Equal(peer.FSM.Current(), PeerStateLeave)
}, },
}, },
{
name: "peer download piece timeout and peer state is PeerStateRunning",
gcConfig: &config.GCConfig{
PieceDownloadTimeout: 1 * time.Microsecond,
PeerGCInterval: 1 * time.Second,
PeerTTL: 5 * time.Minute,
},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) {
assert := assert.New(t)
peerManager.Store(mockPeer)
mockPeer.FSM.SetState(PeerStateRunning)
err := peerManager.RunGC()
assert.NoError(err)
peer, ok := peerManager.Load(mockPeer.ID)
assert.Equal(ok, true)
assert.Equal(peer.FSM.Current(), PeerStateLeave)
err = peerManager.RunGC()
assert.NoError(err)
_, ok = peerManager.Load(mockPeer.ID)
assert.Equal(ok, false)
},
},
{
name: "peer download piece timeout and peer state is PeerStateBackToSource",
gcConfig: &config.GCConfig{
PieceDownloadTimeout: 1 * time.Microsecond,
PeerGCInterval: 1 * time.Second,
PeerTTL: 5 * time.Minute,
},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) {
assert := assert.New(t)
peerManager.Store(mockPeer)
mockPeer.FSM.SetState(PeerStateBackToSource)
err := peerManager.RunGC()
assert.NoError(err)
peer, ok := peerManager.Load(mockPeer.ID)
assert.Equal(ok, true)
assert.Equal(peer.FSM.Current(), PeerStateLeave)
err = peerManager.RunGC()
assert.NoError(err)
_, ok = peerManager.Load(mockPeer.ID)
assert.Equal(ok, false)
},
},
{ {
name: "peer reclaimed", name: "peer reclaimed",
gcConfig: &config.GCConfig{ gcConfig: &config.GCConfig{
PieceDownloadTimeout: 5 * time.Minute,
PeerGCInterval: 1 * time.Second, PeerGCInterval: 1 * time.Second,
PeerTTL: 1 * time.Microsecond, PeerTTL: 1 * time.Microsecond,
}, },
@ -367,6 +425,7 @@ func TestPeerManager_RunGC(t *testing.T) {
{ {
name: "peer state is PeerStateFailed", name: "peer state is PeerStateFailed",
gcConfig: &config.GCConfig{ gcConfig: &config.GCConfig{
PieceDownloadTimeout: 5 * time.Minute,
PeerGCInterval: 1 * time.Second, PeerGCInterval: 1 * time.Second,
PeerTTL: 1 * time.Microsecond, PeerTTL: 1 * time.Microsecond,
}, },
@ -388,6 +447,7 @@ func TestPeerManager_RunGC(t *testing.T) {
{ {
name: "peer gets degree failed", name: "peer gets degree failed",
gcConfig: &config.GCConfig{ gcConfig: &config.GCConfig{
PieceDownloadTimeout: 5 * time.Minute,
PeerGCInterval: 1 * time.Second, PeerGCInterval: 1 * time.Second,
PeerTTL: 1 * time.Hour, PeerTTL: 1 * time.Hour,
}, },
@ -410,6 +470,7 @@ func TestPeerManager_RunGC(t *testing.T) {
{ {
name: "peer reclaimed with PeerCountLimitForTask", name: "peer reclaimed with PeerCountLimitForTask",
gcConfig: &config.GCConfig{ gcConfig: &config.GCConfig{
PieceDownloadTimeout: 5 * time.Minute,
PeerGCInterval: 1 * time.Second, PeerGCInterval: 1 * time.Second,
PeerTTL: 1 * time.Hour, PeerTTL: 1 * time.Hour,
}, },

View File

@ -62,8 +62,8 @@ func TestPeer_NewPeer(t *testing.T) {
assert.Equal(peer.FSM.Current(), PeerStatePending) assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask) assert.EqualValues(peer.Task, mockTask)
assert.EqualValues(peer.Host, mockHost) assert.EqualValues(peer.Host, mockHost)
assert.NotEqual(peer.CreateAt.Load(), 0) assert.NotEqual(peer.CreatedAt.Load(), 0)
assert.NotEqual(peer.UpdateAt.Load(), 0) assert.NotEqual(peer.UpdatedAt.Load(), 0)
assert.NotNil(peer.Log) assert.NotNil(peer.Log)
}, },
}, },
@ -83,8 +83,8 @@ func TestPeer_NewPeer(t *testing.T) {
assert.Equal(peer.FSM.Current(), PeerStatePending) assert.Equal(peer.FSM.Current(), PeerStatePending)
assert.EqualValues(peer.Task, mockTask) assert.EqualValues(peer.Task, mockTask)
assert.EqualValues(peer.Host, mockHost) assert.EqualValues(peer.Host, mockHost)
assert.NotEqual(peer.CreateAt.Load(), 0) assert.NotEqual(peer.CreatedAt.Load(), 0)
assert.NotEqual(peer.UpdateAt.Load(), 0) assert.NotEqual(peer.UpdatedAt.Load(), 0)
assert.NotNil(peer.Log) assert.NotNil(peer.Log)
}, },
}, },

View File

@ -142,9 +142,10 @@ func (s *seedPeer) TriggerTask(ctx context.Context, task *Task) (*Peer, *schedul
peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds())
// When the piece is downloaded successfully, // When the piece is downloaded successfully,
// peer.UpdateAt needs to be updated to prevent // peer.UpdatedAt needs to be updated to prevent
// the peer from being GC during the download process. // the peer from being GC during the download process.
peer.UpdateAt.Store(time.Now()) peer.UpdatedAt.Store(time.Now())
peer.PieceUpdatedAt.Store(time.Now())
task.StorePiece(piece.PieceInfo) task.StorePiece(piece.PieceInfo)
// Statistical traffic metrics. // Statistical traffic metrics.

View File

@ -132,11 +132,11 @@ type Task struct {
// if one peer succeeds, the value is reset to zero. // if one peer succeeds, the value is reset to zero.
PeerFailedCount *atomic.Int32 PeerFailedCount *atomic.Int32
// CreateAt is task create time. // CreatedAt is task create time.
CreateAt *atomic.Time CreatedAt *atomic.Time
// UpdateAt is task update time. // UpdatedAt is task update time.
UpdateAt *atomic.Time UpdatedAt *atomic.Time
// Task log. // Task log.
Log *logger.SugaredLoggerOnWith Log *logger.SugaredLoggerOnWith
@ -157,8 +157,8 @@ func NewTask(id, url string, taskType commonv1.TaskType, meta *commonv1.UrlMeta,
Pieces: &sync.Map{}, Pieces: &sync.Map{},
DAG: dag.NewDAG[*Peer](), DAG: dag.NewDAG[*Peer](),
PeerFailedCount: atomic.NewInt32(0), PeerFailedCount: atomic.NewInt32(0),
CreateAt: atomic.NewTime(time.Now()), CreatedAt: atomic.NewTime(time.Now()),
UpdateAt: atomic.NewTime(time.Now()), UpdatedAt: atomic.NewTime(time.Now()),
Log: logger.WithTask(id, url), Log: logger.WithTask(id, url),
} }
@ -173,19 +173,19 @@ func NewTask(id, url string, taskType commonv1.TaskType, meta *commonv1.UrlMeta,
}, },
fsm.Callbacks{ fsm.Callbacks{
TaskEventDownload: func(e *fsm.Event) { TaskEventDownload: func(e *fsm.Event) {
t.UpdateAt.Store(time.Now()) t.UpdatedAt.Store(time.Now())
t.Log.Infof("task state is %s", e.FSM.Current()) t.Log.Infof("task state is %s", e.FSM.Current())
}, },
TaskEventDownloadSucceeded: func(e *fsm.Event) { TaskEventDownloadSucceeded: func(e *fsm.Event) {
t.UpdateAt.Store(time.Now()) t.UpdatedAt.Store(time.Now())
t.Log.Infof("task state is %s", e.FSM.Current()) t.Log.Infof("task state is %s", e.FSM.Current())
}, },
TaskEventDownloadFailed: func(e *fsm.Event) { TaskEventDownloadFailed: func(e *fsm.Event) {
t.UpdateAt.Store(time.Now()) t.UpdatedAt.Store(time.Now())
t.Log.Infof("task state is %s", e.FSM.Current()) t.Log.Infof("task state is %s", e.FSM.Current())
}, },
TaskEventLeave: func(e *fsm.Event) { TaskEventLeave: func(e *fsm.Event) {
t.UpdateAt.Store(time.Now()) t.UpdatedAt.Store(time.Now())
t.Log.Infof("task state is %s", e.FSM.Current()) t.Log.Infof("task state is %s", e.FSM.Current())
}, },
}, },
@ -362,7 +362,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) {
sort.Slice( sort.Slice(
peers, peers,
func(i, j int) bool { func(i, j int) bool {
return peers[i].UpdateAt.Load().After(peers[j].UpdateAt.Load()) return peers[i].UpdatedAt.Load().After(peers[j].UpdatedAt.Load())
}, },
) )
@ -376,7 +376,7 @@ func (t *Task) LoadSeedPeer() (*Peer, bool) {
// IsSeedPeerFailed returns whether the seed peer in the task failed. // IsSeedPeerFailed returns whether the seed peer in the task failed.
func (t *Task) IsSeedPeerFailed() bool { func (t *Task) IsSeedPeerFailed() bool {
seedPeer, ok := t.LoadSeedPeer() seedPeer, ok := t.LoadSeedPeer()
return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreateAt.Load()) < SeedPeerFailedTimeout return ok && seedPeer.FSM.Is(PeerStateFailed) && time.Since(seedPeer.CreatedAt.Load()) < SeedPeerFailedTimeout
} }
// LoadPiece return piece for a key. // LoadPiece return piece for a key.

View File

@ -82,8 +82,8 @@ func TestTask_NewTask(t *testing.T) {
assert.Equal(task.FSM.Current(), TaskStatePending) assert.Equal(task.FSM.Current(), TaskStatePending)
assert.Empty(task.Pieces) assert.Empty(task.Pieces)
assert.Equal(task.PeerCount(), 0) assert.Equal(task.PeerCount(), 0)
assert.NotEqual(task.CreateAt.Load(), 0) assert.NotEqual(task.CreatedAt.Load(), 0)
assert.NotEqual(task.UpdateAt.Load(), 0) assert.NotEqual(task.UpdatedAt.Load(), 0)
assert.NotNil(task.Log) assert.NotNil(task.Log)
}, },
}, },
@ -1042,8 +1042,8 @@ func TestTask_LoadSeedPeer(t *testing.T) {
task.StorePeer(mockPeer) task.StorePeer(mockPeer)
task.StorePeer(mockSeedPeer) task.StorePeer(mockSeedPeer)
mockPeer.UpdateAt.Store(time.Now()) mockPeer.UpdatedAt.Store(time.Now())
mockSeedPeer.UpdateAt.Store(time.Now().Add(1 * time.Second)) mockSeedPeer.UpdatedAt.Store(time.Now().Add(1 * time.Second))
peer, ok := task.LoadSeedPeer() peer, ok := task.LoadSeedPeer()
assert.True(ok) assert.True(ok)
@ -1124,7 +1124,7 @@ func TestTask_IsSeedPeerFailed(t *testing.T) {
assert := assert.New(t) assert := assert.New(t)
task.StorePeer(mockPeer) task.StorePeer(mockPeer)
task.StorePeer(mockSeedPeer) task.StorePeer(mockSeedPeer)
mockSeedPeer.CreateAt.Store(time.Now().Add(-SeedPeerFailedTimeout)) mockSeedPeer.CreatedAt.Store(time.Now().Add(-SeedPeerFailedTimeout))
mockSeedPeer.FSM.SetState(PeerStateFailed) mockSeedPeer.FSM.SetState(PeerStateFailed)
assert.False(task.IsSeedPeerFailed()) assert.False(task.IsSeedPeerFailed())

View File

@ -744,16 +744,17 @@ func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, p
peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds())
// When the piece is downloaded successfully, // When the piece is downloaded successfully,
// peer's UpdateAt needs to be updated // peer's UpdatedAt needs to be updated
// to prevent the peer from being GC during the download process. // to prevent the peer from being GC during the download process.
peer.UpdateAt.Store(time.Now()) peer.UpdatedAt.Store(time.Now())
peer.PieceUpdatedAt.Store(time.Now())
// When the piece is downloaded successfully, // When the piece is downloaded successfully,
// dst peer's UpdateAt needs to be updated // dst peer's UpdatedAt needs to be updated
// to prevent the dst peer from being GC during the download process. // to prevent the dst peer from being GC during the download process.
if !resource.IsPieceBackToSource(piece) { if !resource.IsPieceBackToSource(piece) {
if destPeer, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded { if destPeer, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded {
destPeer.UpdateAt.Store(time.Now()) destPeer.UpdatedAt.Store(time.Now())
} }
} }
@ -997,8 +998,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re
Application: parent.Application, Application: parent.Application,
State: parent.FSM.Current(), State: parent.FSM.Current(),
Cost: req.Cost, Cost: req.Cost,
CreateAt: parent.CreateAt.Load().UnixNano(), CreatedAt: parent.CreatedAt.Load().UnixNano(),
UpdateAt: parent.UpdateAt.Load().UnixNano(), UpdatedAt: parent.UpdatedAt.Load().UnixNano(),
Host: storage.Host{ Host: storage.Host{
ID: parent.Host.ID, ID: parent.Host.ID,
Type: parent.Host.Type.Name(), Type: parent.Host.Type.Name(),
@ -1015,8 +1016,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re
ConcurrentUploadCount: parent.Host.ConcurrentUploadCount.Load(), ConcurrentUploadCount: parent.Host.ConcurrentUploadCount.Load(),
UploadCount: parent.Host.UploadCount.Load(), UploadCount: parent.Host.UploadCount.Load(),
UploadFailedCount: parent.Host.UploadFailedCount.Load(), UploadFailedCount: parent.Host.UploadFailedCount.Load(),
CreateAt: parent.Host.CreateAt.Load().UnixNano(), CreatedAt: parent.Host.CreatedAt.Load().UnixNano(),
UpdateAt: parent.Host.UpdateAt.Load().UnixNano(), UpdatedAt: parent.Host.UpdatedAt.Load().UnixNano(),
}, },
} }
@ -1095,8 +1096,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re
State: peer.FSM.Current(), State: peer.FSM.Current(),
Cost: req.Cost, Cost: req.Cost,
Parents: parentRecords, Parents: parentRecords,
CreateAt: peer.CreateAt.Load().UnixNano(), CreatedAt: peer.CreatedAt.Load().UnixNano(),
UpdateAt: peer.UpdateAt.Load().UnixNano(), UpdatedAt: peer.UpdatedAt.Load().UnixNano(),
Task: storage.Task{ Task: storage.Task{
ID: peer.Task.ID, ID: peer.Task.ID,
URL: peer.Task.URL, URL: peer.Task.URL,
@ -1106,8 +1107,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re
BackToSourceLimit: peer.Task.BackToSourceLimit.Load(), BackToSourceLimit: peer.Task.BackToSourceLimit.Load(),
BackToSourcePeerCount: int32(peer.Task.BackToSourcePeers.Len()), BackToSourcePeerCount: int32(peer.Task.BackToSourcePeers.Len()),
State: peer.Task.FSM.Current(), State: peer.Task.FSM.Current(),
CreateAt: peer.Task.CreateAt.Load().UnixNano(), CreatedAt: peer.Task.CreatedAt.Load().UnixNano(),
UpdateAt: peer.Task.UpdateAt.Load().UnixNano(), UpdatedAt: peer.Task.UpdatedAt.Load().UnixNano(),
}, },
Host: storage.Host{ Host: storage.Host{
ID: peer.Host.ID, ID: peer.Host.ID,
@ -1125,8 +1126,8 @@ func (s *Service) createRecord(peer *resource.Peer, parents []*resource.Peer, re
ConcurrentUploadCount: peer.Host.ConcurrentUploadCount.Load(), ConcurrentUploadCount: peer.Host.ConcurrentUploadCount.Load(),
UploadCount: peer.Host.UploadCount.Load(), UploadCount: peer.Host.UploadCount.Load(),
UploadFailedCount: peer.Host.UploadFailedCount.Load(), UploadFailedCount: peer.Host.UploadFailedCount.Load(),
CreateAt: peer.Host.CreateAt.Load().UnixNano(), CreatedAt: peer.Host.CreatedAt.Load().UnixNano(),
UpdateAt: peer.Host.UpdateAt.Load().UnixNano(), UpdatedAt: peer.Host.UpdatedAt.Load().UnixNano(),
}, },
} }

View File

@ -43,8 +43,8 @@ var (
BackToSourceLimit: 10, BackToSourceLimit: 10,
BackToSourcePeerCount: 2, BackToSourcePeerCount: 2,
State: "Succeeded", State: "Succeeded",
CreateAt: time.Now().UnixNano(), CreatedAt: time.Now().UnixNano(),
UpdateAt: time.Now().UnixNano(), UpdatedAt: time.Now().UnixNano(),
} }
mockHost = Host{ mockHost = Host{
@ -113,8 +113,8 @@ var (
GoVersion: "1.19", GoVersion: "1.19",
Platform: "linux", Platform: "linux",
}, },
CreateAt: time.Now().UnixNano(), CreatedAt: time.Now().UnixNano(),
UpdateAt: time.Now().UnixNano(), UpdatedAt: time.Now().UnixNano(),
} }
mockParent = Parent{ mockParent = Parent{
@ -124,8 +124,8 @@ var (
State: "Succeeded", State: "Succeeded",
Cost: 1000, Cost: 1000,
Host: mockHost, Host: mockHost,
CreateAt: time.Now().UnixNano(), CreatedAt: time.Now().UnixNano(),
UpdateAt: time.Now().UnixNano(), UpdatedAt: time.Now().UnixNano(),
} }
mockParents = append(make([]Parent, 19), mockParent) mockParents = append(make([]Parent, 19), mockParent)
@ -139,8 +139,8 @@ var (
Task: mockTask, Task: mockTask,
Host: mockHost, Host: mockHost,
Parents: mockParents, Parents: mockParents,
CreateAt: time.Now().UnixNano(), CreatedAt: time.Now().UnixNano(),
UpdateAt: time.Now().UnixNano(), UpdatedAt: time.Now().UnixNano(),
} }
) )

View File

@ -42,11 +42,11 @@ type Task struct {
// State is the download state of the task. // State is the download state of the task.
State string `csv:"state"` State string `csv:"state"`
// CreateAt is peer create nanosecond time. // CreatedAt is peer create nanosecond time.
CreateAt int64 `csv:"createAt"` CreatedAt int64 `csv:"createdAt"`
// UpdateAt is peer update nanosecond time. // UpdatedAt is peer update nanosecond time.
UpdateAt int64 `csv:"updateAt"` UpdatedAt int64 `csv:"updatedAt"`
} }
// Host contains content for host. // Host contains content for host.
@ -111,11 +111,11 @@ type Host struct {
// Build information. // Build information.
Build Build `csv:"build"` Build Build `csv:"build"`
// CreateAt is peer create nanosecond time. // CreatedAt is peer create nanosecond time.
CreateAt int64 `csv:"createAt"` CreatedAt int64 `csv:"createdAt"`
// UpdateAt is peer update nanosecond time. // UpdatedAt is peer update nanosecond time.
UpdateAt int64 `csv:"updateAt"` UpdatedAt int64 `csv:"updatedAt"`
} }
// CPU contains content for cpu. // CPU contains content for cpu.
@ -273,11 +273,11 @@ type Parent struct {
// Host is peer host. // Host is peer host.
Host Host `csv:"host"` Host Host `csv:"host"`
// CreateAt is peer create nanosecond time. // CreatedAt is peer create nanosecond time.
CreateAt int64 `csv:"createAt"` CreatedAt int64 `csv:"createdAt"`
// UpdateAt is peer update nanosecond time. // UpdatedAt is peer update nanosecond time.
UpdateAt int64 `csv:"updateAt"` UpdatedAt int64 `csv:"updatedAt"`
} }
// Record contains content for record. // Record contains content for record.
@ -306,9 +306,9 @@ type Record struct {
// Parents is peer parents. // Parents is peer parents.
Parents []Parent `csv:"parents" csv[]:"20"` Parents []Parent `csv:"parents" csv[]:"20"`
// CreateAt is peer create nanosecond time. // CreatedAt is peer create nanosecond time.
CreateAt int64 `csv:"createAt"` CreatedAt int64 `csv:"createdAt"`
// UpdateAt is peer update nanosecond time. // UpdatedAt is peer update nanosecond time.
UpdateAt int64 `csv:"updateAt"` UpdatedAt int64 `csv:"updatedAt"`
} }