diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 8b37ce8a5..fed5a80f1 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -18,7 +18,6 @@ package resource import ( "context" - "errors" "fmt" "io" "net/http" @@ -355,38 +354,6 @@ func (p *Peer) Parents() []*Peer { return parents } -// MainParent returns the parent whose -// peer has downloaded the most pieces. -func (p *Peer) MainParent() (*Peer, error) { - var ( - parents = map[string]int{} - maxPieceCount = 0 - mainParentID string - ) - - for _, piece := range p.Pieces.Values() { - if piece.DstPid != "" { - parents[piece.DstPid]++ - } - - if parents[piece.DstPid] > maxPieceCount { - mainParentID = piece.DstPid - maxPieceCount = parents[piece.DstPid] - } - } - - if mainParentID == "" { - return nil, errors.New("can not found main parent") - } - - parent, err := p.Task.DAG.GetVertex(mainParentID) - if err != nil { - return nil, err - } - - return parent.Value, nil -} - // Children returns children of peer. func (p *Peer) Children() []*Peer { vertex, err := p.Task.DAG.GetVertex(p.ID) diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index d03df1477..8e1f2c76d 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -314,74 +314,6 @@ func TestPeer_Parents(t *testing.T) { } } -func TestPeer_MainParent(t *testing.T) { - tests := []struct { - name string - expect func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) - }{ - { - name: "get main parent", - expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { - assert := assert.New(t) - peer.Pieces.Add(&schedulerv1.PieceResult{ - DstPid: peer.ID, - }) - peer.Pieces.Add(&schedulerv1.PieceResult{ - DstPid: seedPeer.ID, - }) - peer.Pieces.Add(&schedulerv1.PieceResult{ - DstPid: seedPeer.ID, - }) - - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - parent, err := peer.MainParent() - assert.NoError(err) - assert.Equal(parent.ID, seedPeer.ID) - }, - }, - { - name: "can not found main parent", - expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { - assert := assert.New(t) - peer.Pieces.Add(&schedulerv1.PieceResult{}) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - _, err := peer.MainParent() - assert.EqualError(err, "can not found main parent") - }, - }, - { - name: "get main parent failed", - expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) { - assert := assert.New(t) - peer.Pieces.Add(&schedulerv1.PieceResult{ - DstPid: "foo", - }) - peer.Pieces.Add(&schedulerv1.PieceResult{}) - peer.Task.StorePeer(peer) - peer.Task.StorePeer(seedPeer) - _, err := peer.MainParent() - assert.EqualError(err, "vertex not found") - }, - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - ctl := gomock.NewController(t) - defer ctl.Finish() - stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl) - - mockHost := NewHost(mockRawHost) - mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit)) - peer := NewPeer(mockPeerID, mockTask, mockHost) - seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost) - tc.expect(t, peer, seedPeer, stream) - }) - } -} - func TestPeer_Children(t *testing.T) { tests := []struct { name string diff --git a/scheduler/resource/piece.go b/scheduler/resource/piece.go new file mode 100644 index 000000000..50227584f --- /dev/null +++ b/scheduler/resource/piece.go @@ -0,0 +1,26 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package resource + +import ( + schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" +) + +// IsPieceBackToSource returns whether the piece is downloaded back-to-source. +func IsPieceBackToSource(piece *schedulerv1.PieceResult) bool { + return piece.DstPid == "" +} diff --git a/scheduler/resource/piece_test.go b/scheduler/resource/piece_test.go new file mode 100644 index 000000000..c78f0bf0c --- /dev/null +++ b/scheduler/resource/piece_test.go @@ -0,0 +1,60 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package resource + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" +) + +func TestResource_IsPieceBackToSource(t *testing.T) { + tests := []struct { + name string + piece *schedulerv1.PieceResult + expect func(t *testing.T, ok bool) + }{ + { + name: "piece is back-to-source", + piece: &schedulerv1.PieceResult{ + DstPid: "", + }, + expect: func(t *testing.T, ok bool) { + assert := assert.New(t) + assert.True(ok) + }, + }, + { + name: "piece is not back-to-source", + piece: &schedulerv1.PieceResult{ + DstPid: "foo", + }, + expect: func(t *testing.T, ok bool) { + assert := assert.New(t) + assert.False(ok) + }, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + tc.expect(t, IsPieceBackToSource(tc.piece)) + }) + } +} diff --git a/scheduler/service/service.go b/scheduler/service/service.go index 72254b1e5..aa78f59f5 100644 --- a/scheduler/service/service.go +++ b/scheduler/service/service.go @@ -233,15 +233,13 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu metrics.PeerHostTraffic.WithLabelValues(peer.Tag, peer.Application, metrics.PeerHostTrafficDownloadType, peer.Host.ID, peer.Host.IP).Add(float64(piece.PieceInfo.RangeSize)) if parent, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded { metrics.PeerHostTraffic.WithLabelValues(peer.Tag, peer.Application, metrics.PeerHostTrafficUploadType, parent.Host.ID, parent.Host.IP).Add(float64(piece.PieceInfo.RangeSize)) - } else { - if piece.DstPid != "" { // not backSource piece - peer.Log.Warnf("dst peer %s not found for piece %#v %#v", piece.DstPid, piece, piece.PieceInfo) - } + } else if !resource.IsPieceBackToSource(piece) { + peer.Log.Warnf("dst peer %s not found for piece %#v %#v", piece.DstPid, piece, piece.PieceInfo) } } // Collect traffic metrics. - if piece.DstPid != "" { + if !resource.IsPieceBackToSource(piece) { metrics.Traffic.WithLabelValues(peer.Tag, peer.Application, metrics.TrafficP2PType).Add(float64(piece.PieceInfo.RangeSize)) } else { metrics.Traffic.WithLabelValues(peer.Tag, peer.Application, metrics.TrafficBackToSourceType).Add(float64(piece.PieceInfo.RangeSize)) @@ -747,11 +745,15 @@ func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, p peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds()) // When the piece is downloaded successfully, - // peer.UpdateAt needs to be updated to prevent - // the peer from being GC during the download process. + // peer's UpdateAt needs to be updated + // to prevent the peer from being GC during the download process. peer.UpdateAt.Store(time.Now()) - if piece.DstPid != "" { - if destPeer, ok := s.resource.PeerManager().Load(piece.DstPid); ok { + + // When the piece is downloaded successfully, + // dst peer's UpdateAt needs to be updated + // to prevent the dst peer from being GC during the download process. + if !resource.IsPieceBackToSource(piece) { + if destPeer, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded { destPeer.UpdateAt.Store(time.Now()) } }