refactor: piece_dispatcher considering score of parent peer (#1978)

Signed-off-by: bigerous <cuidajun.cdj@alibaba-inc.com>
This commit is contained in:
cuidajun 2023-01-18 17:21:39 +08:00 committed by Gaius
parent b6f0435011
commit 304448009f
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
7 changed files with 459 additions and 22 deletions

View File

@ -64,7 +64,8 @@ const (
DefaultSchedulerPort = 8002
DefaultPieceChanSize = 16
DefaultPieceQueueExponent = 4
DefaultPieceQueueExponent = 10
DefaultPieceDispatcherRandomRatio = 0.1
DefaultObjectMaxReplicas = 3
)

View File

@ -44,7 +44,6 @@ import (
"d7y.io/dragonfly/v2/client/util"
"d7y.io/dragonfly/v2/internal/dferrors"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/ring"
"d7y.io/dragonfly/v2/pkg/digest"
"d7y.io/dragonfly/v2/pkg/idgen"
"d7y.io/dragonfly/v2/pkg/rpc/common"
@ -545,7 +544,7 @@ func (pt *peerTaskConductor) pullPieces() {
func (pt *peerTaskConductor) pullPiecesWithP2P() {
var (
// keep same size with pt.failedPieceCh for avoiding deadlock
pieceRequestQueue = ring.NewRandom[DownloadPieceRequest](config.DefaultPieceQueueExponent)
pieceRequestQueue = NewPieceDispatcher(config.DefaultPieceDispatcherRandomRatio, pt.Log())
)
ctx, cancel := context.WithCancel(pt.ctx)
@ -657,7 +656,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
pt.PublishPieceInfo(0, uint32(contentLength))
}
func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[DownloadPieceRequest]) {
func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue PieceDispatcher) {
var (
lastNotReadyPiece int32 = 0
peerPacket *schedulerv1.PeerPacket
@ -944,7 +943,7 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
}
}
func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue ring.Queue[DownloadPieceRequest]) {
func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue PieceDispatcher) {
if count < 1 {
count = 4
}
@ -980,11 +979,14 @@ func (pt *peerTaskConductor) waitFirstPeerPacket(done chan bool) {
}
}
func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[DownloadPieceRequest]) {
func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests PieceDispatcher) {
for {
request, ok := requests.Dequeue()
if !ok {
pt.Infof("piece download queue cancelled, peer download worker #%d exit", id)
request, err := requests.Get()
if errors.Is(err, ErrNoValidPieceTemporarily) {
continue
}
if err != nil {
pt.Infof("piece download queue cancelled, peer download worker #%d exit, err: %v", id, err)
return
}
pt.readyPiecesLock.RLock()
@ -994,7 +996,10 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[D
continue
}
pt.readyPiecesLock.RUnlock()
pt.downloadPiece(id, request)
result := pt.downloadPiece(id, request)
if result != nil {
requests.Report(result)
}
select {
case <-pt.pieceDownloadCtx.Done():
pt.Infof("piece download cancelled, peer download worker #%d exit", id)
@ -1010,14 +1015,14 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[D
}
}
func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) {
func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) *DownloadPieceResult {
// only downloading piece in one worker at same time
pt.runningPiecesLock.Lock()
if pt.runningPieces.IsSet(request.piece.PieceNum) {
pt.runningPiecesLock.Unlock()
pt.Log().Debugf("piece %d is downloading, skip", request.piece.PieceNum)
// TODO save to queue for failed pieces
return
return nil
}
pt.runningPieces.Set(request.piece.PieceNum)
pt.runningPiecesLock.Unlock()
@ -1036,7 +1041,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
if pt.limiter != nil && !pt.waitLimit(ctx, request) {
span.SetAttributes(config.AttributePieceSuccess.Bool(false))
span.End()
return
return nil
}
pt.Debugf("peer download worker #%d receive piece task, "+
@ -1051,7 +1056,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
span.End()
if pt.needBackSource.Load() {
pt.Infof("switch to back source, skip send failed piece")
return
return result
}
attempt, success := pt.pieceTaskSyncManager.acquire(
&commonv1.PieceTaskRequest{
@ -1062,7 +1067,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
})
pt.Infof("send failed piece %d to remote, attempt: %d, success: %d",
request.piece.PieceNum, attempt, success)
return
return result
}
// broadcast success piece
pt.reportSuccessResult(request, result)
@ -1070,6 +1075,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
span.SetAttributes(config.AttributePieceSuccess.Bool(true))
span.End()
return result
}
func (pt *peerTaskConductor) waitLimit(ctx context.Context, request *DownloadPieceRequest) bool {

View File

@ -36,7 +36,6 @@ import (
"d7y.io/dragonfly/v2/client/config"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/container/ring"
"d7y.io/dragonfly/v2/pkg/dfnet"
"d7y.io/dragonfly/v2/pkg/net/ip"
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
@ -47,7 +46,7 @@ type pieceTaskSyncManager struct {
ctx context.Context
ctxCancel context.CancelFunc
peerTaskConductor *peerTaskConductor
pieceRequestQueue ring.Queue[DownloadPieceRequest]
pieceRequestQueue PieceDispatcher
workers map[string]*pieceTaskSynchronizer
watchdog *synchronizerWatchdog
}
@ -64,7 +63,7 @@ type pieceTaskSynchronizer struct {
grpcInitialized *atomic.Bool
grpcInitError atomic.Value
peerTaskConductor *peerTaskConductor
pieceRequestQueue ring.Queue[DownloadPieceRequest]
pieceRequestQueue PieceDispatcher
}
type synchronizerWatchdog struct {
@ -216,7 +215,7 @@ func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (atte
s.RLock()
for _, p := range s.workers {
attempt++
if p.acquire(request) == nil {
if p.grpcInitialized.Load() && p.acquire(request) == nil {
success++
}
}
@ -372,7 +371,7 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.Piece
DstAddr: piecePacket.DstAddr,
}
s.pieceRequestQueue.Enqueue(req)
s.pieceRequestQueue.Put(req)
s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum))
select {

View File

@ -0,0 +1,173 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package peer
import (
"errors"
"math/rand"
"sync"
"time"
"go.uber.org/atomic"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
logger "d7y.io/dragonfly/v2/internal/dflog"
)
type PieceDispatcher interface {
// Put pieceSynchronizer put piece request into PieceDispatcher
Put(req *DownloadPieceRequest)
// Get downloader will get piece request from PieceDispatcher
Get() (req *DownloadPieceRequest, err error)
// Report downloader will report piece download result to PieceDispatcher, so PieceDispatcher can score peers
Report(result *DownloadPieceResult)
// Close related resources, and not accept Put and Get anymore
Close()
}
var ErrNoValidPieceTemporarily = errors.New("no valid piece temporarily")
type pieceDispatcher struct {
// peerRequests hold piece requests of peers. Key is PeerID, value is piece requests
peerRequests map[string][]*DownloadPieceRequest
// score hold the score of each peer.
score map[string]int64
// downloaded hold the already successfully downloaded piece num
downloaded map[int32]struct{}
// sum is the valid num of piece requests. When sum == 0, the consumer will wait until there is a request is putted
sum *atomic.Int64
closed bool
cond *sync.Cond
lock *sync.Mutex
log *logger.SugaredLoggerOnWith
randomRatio float64
// rand is not thread-safe
rand *rand.Rand
}
var (
// the lower, the better
maxScore = int64(0)
minScore = (60 * time.Second).Nanoseconds()
)
func NewPieceDispatcher(randomRatio float64, log *logger.SugaredLoggerOnWith) PieceDispatcher {
lock := &sync.Mutex{}
pd := &pieceDispatcher{
peerRequests: map[string][]*DownloadPieceRequest{},
score: map[string]int64{},
downloaded: map[int32]struct{}{},
sum: atomic.NewInt64(0),
closed: false,
cond: sync.NewCond(lock),
lock: lock,
log: log.With("component", "pieceDispatcher"),
randomRatio: randomRatio,
rand: rand.New(rand.NewSource(time.Now().Unix())),
}
log.Debugf("piece dispatcher created")
return pd
}
func (p *pieceDispatcher) Put(req *DownloadPieceRequest) {
p.lock.Lock()
defer p.lock.Unlock()
if reqs, ok := p.peerRequests[req.DstPid]; ok {
p.peerRequests[req.DstPid] = append(reqs, req)
} else {
p.peerRequests[req.DstPid] = []*DownloadPieceRequest{req}
}
if _, ok := p.score[req.DstPid]; !ok {
p.score[req.DstPid] = maxScore
}
p.sum.Add(1)
p.cond.Broadcast()
}
func (p *pieceDispatcher) Get() (req *DownloadPieceRequest, err error) {
p.lock.Lock()
defer p.lock.Unlock()
for p.sum.Load() == 0 && !p.closed {
p.cond.Wait()
}
if p.closed {
return nil, errors.New("piece dispatcher already closed")
}
return p.getDesiredReq()
}
// getDesiredReq return a req according to performance of each dest peer. It is not thread-safe
func (p *pieceDispatcher) getDesiredReq() (*DownloadPieceRequest, error) {
distPeerIDs := maps.Keys(p.score)
if p.rand.Float64() < p.randomRatio { //random shuffle with the probability of randomRatio
p.rand.Shuffle(len(distPeerIDs), func(i, j int) {
tmp := distPeerIDs[j]
distPeerIDs[j] = distPeerIDs[i]
distPeerIDs[i] = tmp
})
} else { // sort by score with the probability of (1-randomRatio)
slices.SortFunc(distPeerIDs, func(p1, p2 string) bool { return p.score[p1] < p.score[p2] })
}
// iterate all peers, until get a valid piece requests
for _, peer := range distPeerIDs {
for len(p.peerRequests[peer]) > 0 {
// choose a random piece request of a peer
n := p.rand.Intn(len(p.peerRequests[peer]))
req := p.peerRequests[peer][n]
p.peerRequests[peer] = append(p.peerRequests[peer][0:n], p.peerRequests[peer][n+1:]...)
p.sum.Sub(1)
if _, ok := p.downloaded[req.piece.PieceNum]; ok { //already downloaded, skip
// p.log.Debugf("skip already downloaded piece , peer: %s, piece:%d", peer, req.piece.PieceNum)
continue
}
// p.log.Debugf("scores :%v, select :%s, piece:%v", p.score, peer, req.piece.PieceNum)
return req, nil
}
}
return nil, ErrNoValidPieceTemporarily
}
// Report pieceDispatcher will score peer according to the download result reported by downloader
// The score of peer is not determined only by last piece downloaded, it is smoothed.
func (p *pieceDispatcher) Report(result *DownloadPieceResult) {
p.lock.Lock()
defer p.lock.Unlock()
if result == nil || result.DstPeerID == "" {
return
}
lastScore := p.score[result.DstPeerID]
if result.Fail {
p.score[result.DstPeerID] = (lastScore + minScore) / 2
} else {
if result.pieceInfo != nil {
p.downloaded[result.pieceInfo.PieceNum] = struct{}{}
}
p.score[result.DstPeerID] = (lastScore + result.FinishTime - result.BeginTime) / 2
}
return
}
func (p *pieceDispatcher) Close() {
p.lock.Lock()
p.closed = true
p.cond.Broadcast()
p.log.Debugf("piece dispatcher closed")
p.lock.Unlock()
}

View File

@ -0,0 +1,248 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package peer
import (
"math/rand"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"golang.org/x/exp/maps"
"golang.org/x/exp/slices"
commonv1 "d7y.io/api/pkg/apis/common/v1"
logger "d7y.io/dragonfly/v2/internal/dflog"
)
type peerDesc struct {
id string
uploadTime time.Duration
count *int
}
type pieceTestManager struct {
pieceDispatcher PieceDispatcher
peers map[string]peerDesc
pieceNum int
}
func newPieceTestManager(pieceDispatcher PieceDispatcher, peers []peerDesc, pieceNum int) *pieceTestManager {
peerMap := make(map[string]peerDesc)
for _, p := range peers {
peerMap[p.id] = peerDesc{
id: p.id,
uploadTime: p.uploadTime,
count: new(int),
}
}
pm := &pieceTestManager{
pieceDispatcher: pieceDispatcher,
peers: peerMap,
pieceNum: pieceNum,
}
return pm
}
func (pc *pieceTestManager) Run() {
wg := &sync.WaitGroup{}
wg.Add(1)
// producer
go func() {
slice := make([]*DownloadPieceRequest, 0)
for i := 0; i < 4; i++ {
for _, peer := range pc.peers {
for j := 0; j < pc.pieceNum; j++ {
slice = append(slice, &DownloadPieceRequest{
piece: &commonv1.PieceInfo{PieceNum: int32(j)},
DstPid: peer.id,
})
}
}
}
rand.Shuffle(len(slice), func(i, j int) {
tmp := slice[i]
slice[i] = slice[j]
slice[j] = tmp
})
for _, req := range slice {
pc.pieceDispatcher.Put(req)
}
wg.Done()
}()
// consumer
wg.Add(1)
go func() {
downloaded := map[int32]struct{}{}
for {
req, err := pc.pieceDispatcher.Get()
if err == ErrNoValidPieceTemporarily {
continue
}
if err != nil {
break
}
*pc.peers[req.DstPid].count = *pc.peers[req.DstPid].count + 1
downloaded[req.piece.PieceNum] = struct{}{}
pc.pieceDispatcher.Report(&DownloadPieceResult{
pieceInfo: req.piece,
DstPeerID: req.DstPid,
BeginTime: time.Now().UnixNano(),
FinishTime: time.Now().Add(pc.peers[req.DstPid].uploadTime).UnixNano(),
})
if len(downloaded) >= pc.pieceNum {
break
}
}
pc.pieceDispatcher.Close()
wg.Done()
}()
wg.Wait()
}
// return a slice of peerIDs, order by count desc
func (pc *pieceTestManager) Order() []string {
peerIDs := maps.Keys(pc.peers)
slices.SortFunc(peerIDs, func(a, b string) bool { return *pc.peers[a].count > *pc.peers[b].count })
return peerIDs
}
func TestPieceDispatcher(t *testing.T) {
type args struct {
randomRatio float64
peers []peerDesc
pieceNum int
}
tests := []struct {
name string
args args
want []string
}{
{
name: "no random",
args: args{
randomRatio: 0,
peers: []peerDesc{
{"bad", time.Second * 10, nil},
{"good", time.Second * 2, nil},
},
pieceNum: 10000,
},
want: []string{"good", "bad"},
},
{
name: "with 0.5 randomRatio",
args: args{
randomRatio: 0.5,
peers: []peerDesc{
{"bad", time.Second * 10, nil},
{"good", time.Second * 2, nil},
},
pieceNum: 10000,
},
want: []string{"good", "bad"},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pieceDispatcher := NewPieceDispatcher(tt.args.randomRatio, logger.With())
pieceTestManager := newPieceTestManager(pieceDispatcher, tt.args.peers, tt.args.pieceNum)
pieceTestManager.Run()
result := pieceTestManager.Order()
assert.Equal(t, tt.want, result)
})
}
}
func TestPieceDispatcherCount(t *testing.T) {
type args struct {
randomRatio float64
peers []peerDesc
pieceNum int
}
tests := []struct {
name string
args args
want map[string]int
}{
{
name: "no random",
args: args{
randomRatio: 0,
peers: []peerDesc{
{"bad", time.Second * 4, nil},
{"mid", time.Second * 3, nil},
{"good", time.Second * 2, nil},
},
pieceNum: 10000,
},
want: map[string]int{
"bad": 0,
"mid": 0,
"good": 8000,
},
},
{
name: "with 0.5 randomRatio",
args: args{
randomRatio: 0.5,
peers: []peerDesc{
{"bad", time.Second * 4, nil},
{"mid", time.Second * 3, nil},
{"good", time.Second * 2, nil},
},
pieceNum: 10000,
},
want: map[string]int{
"bad": 1000,
"mid": 1000,
"good": 6000,
},
},
{
name: "total random",
args: args{
randomRatio: 1,
peers: []peerDesc{
{"bad", time.Second * 10, nil},
{"good", time.Second * 2, nil},
},
pieceNum: 10000,
},
want: map[string]int{
"bad": 4000,
"good": 4000,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
pieceDispatcher := NewPieceDispatcher(tt.args.randomRatio, logger.With())
pieceTestManager := newPieceTestManager(pieceDispatcher, tt.args.peers, tt.args.pieceNum)
pieceTestManager.Run()
for p, c := range tt.want {
if *pieceTestManager.peers[p].count < c {
t.Errorf("peer %s should receive more pieces than %d, however get %d", p, c, *pieceTestManager.peers[p].count)
t.Fail()
}
}
})
}
}

View File

@ -59,6 +59,9 @@ type DownloadPieceResult struct {
BeginTime int64
// FinishTime nanosecond
FinishTime int64
DstPeerID string
Fail bool
pieceInfo *commonv1.PieceInfo
}
type PieceDownloader interface {

View File

@ -171,6 +171,9 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
Size: -1,
BeginTime: time.Now().UnixNano(),
FinishTime: 0,
DstPeerID: request.DstPid,
Fail: false,
pieceInfo: request.piece,
}
// prepare trace and limit
@ -179,6 +182,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
if pm.Limiter != nil {
if err := pm.Limiter.WaitN(ctx, int(request.piece.RangeSize)); err != nil {
result.FinishTime = time.Now().UnixNano()
result.Fail = true
request.log.Errorf("require rate limit access error: %s", err)
return result, err
}
@ -192,6 +196,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
r, c, err := pm.pieceDownloader.DownloadPiece(ctx, request)
if err != nil {
result.FinishTime = time.Now().UnixNano()
result.Fail = true
span.RecordError(err)
request.log.Errorf("download piece failed, piece num: %d, error: %s, from peer: %s",
request.piece.PieceNum, err, request.DstPid)
@ -222,6 +227,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec
span.RecordError(err)
if err != nil {
result.Fail = true
request.log.Errorf("put piece to storage failed, piece num: %d, wrote: %d, error: %s",
request.piece.PieceNum, result.Size, err)
return result, err
@ -237,6 +243,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task,
Size: -1,
BeginTime: time.Now().UnixNano(),
FinishTime: 0,
DstPeerID: "",
}
var (