feat: remove MainParent from peer and add IsPieceBackToSource to piece

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-11-24 21:57:29 +08:00
parent 73439f6f22
commit a9f10bd678
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
5 changed files with 97 additions and 110 deletions

View File

@ -18,7 +18,6 @@ package resource
import (
"context"
"errors"
"fmt"
"io"
"net/http"
@ -355,38 +354,6 @@ func (p *Peer) Parents() []*Peer {
return parents
}
// MainParent returns the parent whose
// peer has downloaded the most pieces.
func (p *Peer) MainParent() (*Peer, error) {
var (
parents = map[string]int{}
maxPieceCount = 0
mainParentID string
)
for _, piece := range p.Pieces.Values() {
if piece.DstPid != "" {
parents[piece.DstPid]++
}
if parents[piece.DstPid] > maxPieceCount {
mainParentID = piece.DstPid
maxPieceCount = parents[piece.DstPid]
}
}
if mainParentID == "" {
return nil, errors.New("can not found main parent")
}
parent, err := p.Task.DAG.GetVertex(mainParentID)
if err != nil {
return nil, err
}
return parent.Value, nil
}
// Children returns children of peer.
func (p *Peer) Children() []*Peer {
vertex, err := p.Task.DAG.GetVertex(p.ID)

View File

@ -314,74 +314,6 @@ func TestPeer_Parents(t *testing.T) {
}
}
func TestPeer_MainParent(t *testing.T) {
tests := []struct {
name string
expect func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer)
}{
{
name: "get main parent",
expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.Pieces.Add(&schedulerv1.PieceResult{
DstPid: peer.ID,
})
peer.Pieces.Add(&schedulerv1.PieceResult{
DstPid: seedPeer.ID,
})
peer.Pieces.Add(&schedulerv1.PieceResult{
DstPid: seedPeer.ID,
})
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
parent, err := peer.MainParent()
assert.NoError(err)
assert.Equal(parent.ID, seedPeer.ID)
},
},
{
name: "can not found main parent",
expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.Pieces.Add(&schedulerv1.PieceResult{})
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
_, err := peer.MainParent()
assert.EqualError(err, "can not found main parent")
},
},
{
name: "get main parent failed",
expect: func(t *testing.T, peer *Peer, seedPeer *Peer, stream schedulerv1.Scheduler_ReportPieceResultServer) {
assert := assert.New(t)
peer.Pieces.Add(&schedulerv1.PieceResult{
DstPid: "foo",
})
peer.Pieces.Add(&schedulerv1.PieceResult{})
peer.Task.StorePeer(peer)
peer.Task.StorePeer(seedPeer)
_, err := peer.MainParent()
assert.EqualError(err, "vertex not found")
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
ctl := gomock.NewController(t)
defer ctl.Finish()
stream := mocks.NewMockScheduler_ReportPieceResultServer(ctl)
mockHost := NewHost(mockRawHost)
mockTask := NewTask(mockTaskID, mockTaskURL, commonv1.TaskType_Normal, mockTaskURLMeta, WithBackToSourceLimit(mockTaskBackToSourceLimit))
peer := NewPeer(mockPeerID, mockTask, mockHost)
seedPeer := NewPeer(mockSeedPeerID, mockTask, mockHost)
tc.expect(t, peer, seedPeer, stream)
})
}
}
func TestPeer_Children(t *testing.T) {
tests := []struct {
name string

View File

@ -0,0 +1,26 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package resource
import (
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
)
// IsPieceBackToSource returns whether the piece is downloaded back-to-source.
func IsPieceBackToSource(piece *schedulerv1.PieceResult) bool {
return piece.DstPid == ""
}

View File

@ -0,0 +1,60 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package resource
import (
"testing"
"github.com/stretchr/testify/assert"
schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1"
)
func TestResource_IsPieceBackToSource(t *testing.T) {
tests := []struct {
name string
piece *schedulerv1.PieceResult
expect func(t *testing.T, ok bool)
}{
{
name: "piece is back-to-source",
piece: &schedulerv1.PieceResult{
DstPid: "",
},
expect: func(t *testing.T, ok bool) {
assert := assert.New(t)
assert.True(ok)
},
},
{
name: "piece is not back-to-source",
piece: &schedulerv1.PieceResult{
DstPid: "foo",
},
expect: func(t *testing.T, ok bool) {
assert := assert.New(t)
assert.False(ok)
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
tc.expect(t, IsPieceBackToSource(tc.piece))
})
}
}

View File

@ -233,15 +233,13 @@ func (s *Service) ReportPieceResult(stream schedulerv1.Scheduler_ReportPieceResu
metrics.PeerHostTraffic.WithLabelValues(peer.Tag, peer.Application, metrics.PeerHostTrafficDownloadType, peer.Host.ID, peer.Host.IP).Add(float64(piece.PieceInfo.RangeSize))
if parent, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded {
metrics.PeerHostTraffic.WithLabelValues(peer.Tag, peer.Application, metrics.PeerHostTrafficUploadType, parent.Host.ID, parent.Host.IP).Add(float64(piece.PieceInfo.RangeSize))
} else {
if piece.DstPid != "" { // not backSource piece
} else if !resource.IsPieceBackToSource(piece) {
peer.Log.Warnf("dst peer %s not found for piece %#v %#v", piece.DstPid, piece, piece.PieceInfo)
}
}
}
// Collect traffic metrics.
if piece.DstPid != "" {
if !resource.IsPieceBackToSource(piece) {
metrics.Traffic.WithLabelValues(peer.Tag, peer.Application, metrics.TrafficP2PType).Add(float64(piece.PieceInfo.RangeSize))
} else {
metrics.Traffic.WithLabelValues(peer.Tag, peer.Application, metrics.TrafficBackToSourceType).Add(float64(piece.PieceInfo.RangeSize))
@ -747,11 +745,15 @@ func (s *Service) handlePieceSuccess(ctx context.Context, peer *resource.Peer, p
peer.AppendPieceCost(pkgtime.SubNano(int64(piece.EndTime), int64(piece.BeginTime)).Milliseconds())
// When the piece is downloaded successfully,
// peer.UpdateAt needs to be updated to prevent
// the peer from being GC during the download process.
// peer's UpdateAt needs to be updated
// to prevent the peer from being GC during the download process.
peer.UpdateAt.Store(time.Now())
if piece.DstPid != "" {
if destPeer, ok := s.resource.PeerManager().Load(piece.DstPid); ok {
// When the piece is downloaded successfully,
// dst peer's UpdateAt needs to be updated
// to prevent the dst peer from being GC during the download process.
if !resource.IsPieceBackToSource(piece) {
if destPeer, loaded := s.resource.PeerManager().Load(piece.DstPid); loaded {
destPeer.UpdateAt.Store(time.Now())
}
}