diff --git a/scheduler/resource/cdn.go b/scheduler/resource/cdn.go index a4675b795..4db70d7ea 100644 --- a/scheduler/resource/cdn.go +++ b/scheduler/resource/cdn.go @@ -80,7 +80,7 @@ func (c *cdn) TriggerTask(ctx context.Context, task *Task) (*Peer, *rpcscheduler // Handle begin of piece if piece.PieceInfo != nil && piece.PieceInfo.PieceNum == common.BeginOfPiece { - task.Log.Infof("receive begin o piece: %#v %#v", piece, piece.PieceInfo) + task.Log.Infof("receive begin of piece: %#v %#v", piece, piece.PieceInfo) peer, err = c.initPeer(task, piece) if err != nil { return nil, nil, err diff --git a/scheduler/resource/host.go b/scheduler/resource/host.go index d80f60a5b..d8c35324d 100644 --- a/scheduler/resource/host.go +++ b/scheduler/resource/host.go @@ -167,7 +167,7 @@ func (h *Host) LenPeers() int { // LeavePeers set peer state to PeerStateLeave func (h *Host) LeavePeers() { - h.Peers.Range(func(key, value interface{}) bool { + h.Peers.Range(func(_, value interface{}) bool { if peer, ok := value.(*Peer); ok { if err := peer.FSM.Event(PeerEventDownloadFailed); err != nil { peer.Log.Errorf("peer fsm event failed: %v", err) @@ -179,7 +179,7 @@ func (h *Host) LeavePeers() { return true } - peer.Log.Info("peer has been left") + h.Log.Infof("peer %s has been left", peer.ID) } return true diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 32c16984a..b56633861 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -239,8 +239,6 @@ func (p *Peer) StoreChild(child *Peer) { defer p.mu.Unlock() p.Children.Store(child.ID, child) - p.Host.Peers.Store(child.ID, child) - p.Task.Peers.Store(child.ID, child) child.Parent.Store(p) } @@ -255,8 +253,6 @@ func (p *Peer) DeleteChild(key string) { } p.Children.Delete(child.ID) - p.Host.DeletePeer(child.ID) - p.Task.DeletePeer(child.ID) child.Parent = &atomic.Value{} } @@ -288,8 +284,6 @@ func (p *Peer) StoreParent(parent *Peer) { p.Parent.Store(parent) parent.Children.Store(p.ID, p) - parent.Host.Peers.Store(p.ID, p) - parent.Task.Peers.Store(p.ID, p) } // DeleteParent deletes peer parent @@ -304,8 +298,6 @@ func (p *Peer) DeleteParent() { p.Parent = &atomic.Value{} parent.Children.Delete(p.ID) - parent.Host.Peers.Delete(p.ID) - parent.Task.Peers.Delete(p.ID) } // ReplaceParent replaces peer parent diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 7435e38f9..cfc21cd77 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -146,12 +146,6 @@ func TestPeer_StoreChild(t *testing.T) { child, ok = peer.LoadChild(childID) assert.Equal(ok, true) assert.Equal(child.ID, childID) - child, ok = peer.Host.LoadPeer(childID) - assert.Equal(ok, true) - assert.Equal(child.ID, childID) - child, ok = peer.Task.LoadPeer(childID) - assert.Equal(ok, true) - assert.Equal(child.ID, childID) parent, ok = child.LoadParent() assert.Equal(ok, true) assert.Equal(parent.ID, peer.ID) @@ -171,12 +165,6 @@ func TestPeer_StoreChild(t *testing.T) { child, ok = peer.LoadChild(childID) assert.Equal(ok, true) assert.Equal(child.ID, childID) - child, ok = peer.Host.LoadPeer(childID) - assert.Equal(ok, true) - assert.Equal(child.ID, childID) - child, ok = peer.Task.LoadPeer(childID) - assert.Equal(ok, true) - assert.Equal(child.ID, childID) parent, ok = child.LoadParent() assert.Equal(ok, true) assert.Equal(parent.ID, peer.ID) @@ -213,10 +201,6 @@ func TestPeer_DeleteChild(t *testing.T) { var ok bool _, ok = peer.LoadChild(mockChildPeer.ID) assert.Equal(ok, false) - _, ok = peer.Host.LoadPeer(mockChildPeer.ID) - assert.Equal(ok, false) - _, ok = peer.Task.LoadPeer(mockChildPeer.ID) - assert.Equal(ok, false) _, ok = mockChildPeer.LoadParent() assert.Equal(ok, false) }, @@ -236,12 +220,6 @@ func TestPeer_DeleteChild(t *testing.T) { child, ok = peer.LoadChild(mockChildPeer.ID) assert.Equal(ok, true) assert.Equal(child.ID, mockChildPeer.ID) - child, ok = peer.Host.LoadPeer(mockChildPeer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, mockChildPeer.ID) - child, ok = peer.Task.LoadPeer(mockChildPeer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, mockChildPeer.ID) parent, ok = child.LoadParent() assert.Equal(ok, true) assert.Equal(parent.ID, peer.ID) @@ -377,12 +355,6 @@ func TestPeer_StoreParent(t *testing.T) { child, ok = parent.LoadChild(peer.ID) assert.Equal(ok, true) assert.Equal(child.ID, peer.ID) - child, ok = peer.Task.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) - child, ok = peer.Host.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) }, }, { @@ -402,12 +374,6 @@ func TestPeer_StoreParent(t *testing.T) { child, ok = parent.LoadChild(peer.ID) assert.Equal(ok, true) assert.Equal(child.ID, peer.ID) - child, ok = peer.Task.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) - child, ok = peer.Host.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) }, }, } @@ -444,10 +410,6 @@ func TestPeer_DeleteParent(t *testing.T) { assert.Equal(ok, false) _, ok = mockParentPeer.LoadChild(peer.ID) assert.Equal(ok, false) - _, ok = mockParentPeer.Task.LoadPeer(peer.ID) - assert.Equal(ok, false) - _, ok = mockParentPeer.Host.LoadPeer(peer.ID) - assert.Equal(ok, false) }, }, { @@ -462,10 +424,6 @@ func TestPeer_DeleteParent(t *testing.T) { assert.Equal(ok, false) _, ok = mockParentPeer.LoadChild(peer.ID) assert.Equal(ok, false) - _, ok = mockParentPeer.Task.LoadPeer(peer.ID) - assert.Equal(ok, false) - _, ok = mockParentPeer.Host.LoadPeer(peer.ID) - assert.Equal(ok, false) }, }, } @@ -511,12 +469,6 @@ func TestPeer_ReplaceParent(t *testing.T) { child, ok = mockNewParentPeer.LoadChild(peer.ID) assert.Equal(ok, true) assert.Equal(child.ID, peer.ID) - child, ok = mockNewParentPeer.Task.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) - child, ok = mockNewParentPeer.Host.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) }, }, { @@ -540,12 +492,6 @@ func TestPeer_ReplaceParent(t *testing.T) { child, ok = mockNewParentPeer.LoadChild(peer.ID) assert.Equal(ok, true) assert.Equal(child.ID, peer.ID) - child, ok = mockNewParentPeer.Task.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) - child, ok = mockNewParentPeer.Host.LoadPeer(peer.ID) - assert.Equal(ok, true) - assert.Equal(child.ID, peer.ID) }, }, } diff --git a/scheduler/service/callback.go b/scheduler/service/callback.go index cf6ecd2ed..eaf98f4ae 100644 --- a/scheduler/service/callback.go +++ b/scheduler/service/callback.go @@ -325,6 +325,10 @@ func (c *callback) PeerLeave(ctx context.Context, peer *resource.Peer) { // 1. CDN downloads the resource successfully // 2. Dfdaemon back-to-source to download successfully func (c *callback) TaskSuccess(ctx context.Context, task *resource.Task, result *rpcscheduler.PeerResult) { + if task.FSM.Is(resource.TaskStateSucceeded) { + return + } + if err := task.FSM.Event(resource.TaskEventDownloadSucceeded); err != nil { task.Log.Errorf("task fsm event failed: %v", err) return @@ -339,6 +343,10 @@ func (c *callback) TaskSuccess(ctx context.Context, task *resource.Task, result // 1. CDN downloads the resource falied // 2. Dfdaemon back-to-source to download failed func (c *callback) TaskFail(ctx context.Context, task *resource.Task) { + if task.FSM.Is(resource.TaskStateFailed) { + return + } + if err := task.FSM.Event(resource.TaskEventDownloadFailed); err != nil { task.Log.Errorf("task fsm event failed: %v", err) return