From 304448009fa4f9c6e00b5e66b2605b8ab0910c40 Mon Sep 17 00:00:00 2001 From: cuidajun Date: Wed, 18 Jan 2023 17:21:39 +0800 Subject: [PATCH] refactor: piece_dispatcher considering score of parent peer (#1978) Signed-off-by: bigerous --- client/config/constants.go | 7 +- client/daemon/peer/peertask_conductor.go | 34 ++- .../peer/peertask_piecetask_synchronizer.go | 9 +- client/daemon/peer/piece_dispatcher.go | 173 ++++++++++++ client/daemon/peer/piece_dispatcher_test.go | 248 ++++++++++++++++++ client/daemon/peer/piece_downloader.go | 3 + client/daemon/peer/piece_manager.go | 7 + 7 files changed, 459 insertions(+), 22 deletions(-) create mode 100644 client/daemon/peer/piece_dispatcher.go create mode 100644 client/daemon/peer/piece_dispatcher_test.go diff --git a/client/config/constants.go b/client/config/constants.go index 2f96d11fd..2c467f3f9 100644 --- a/client/config/constants.go +++ b/client/config/constants.go @@ -63,9 +63,10 @@ const ( DefaultSchedulerIP = "127.0.0.1" DefaultSchedulerPort = 8002 - DefaultPieceChanSize = 16 - DefaultPieceQueueExponent = 4 - DefaultObjectMaxReplicas = 3 + DefaultPieceChanSize = 16 + DefaultPieceQueueExponent = 10 + DefaultPieceDispatcherRandomRatio = 0.1 + DefaultObjectMaxReplicas = 3 ) // Store strategy. diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index ad55fa6a8..e1e624ceb 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -44,7 +44,6 @@ import ( "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/container/ring" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/common" @@ -545,7 +544,7 @@ func (pt *peerTaskConductor) pullPieces() { func (pt *peerTaskConductor) pullPiecesWithP2P() { var ( // keep same size with pt.failedPieceCh for avoiding deadlock - pieceRequestQueue = ring.NewRandom[DownloadPieceRequest](config.DefaultPieceQueueExponent) + pieceRequestQueue = NewPieceDispatcher(config.DefaultPieceDispatcherRandomRatio, pt.Log()) ) ctx, cancel := context.WithCancel(pt.ctx) @@ -657,7 +656,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { pt.PublishPieceInfo(0, uint32(contentLength)) } -func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[DownloadPieceRequest]) { +func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue PieceDispatcher) { var ( lastNotReadyPiece int32 = 0 peerPacket *schedulerv1.PeerPacket @@ -944,7 +943,7 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) { } } -func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue ring.Queue[DownloadPieceRequest]) { +func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue PieceDispatcher) { if count < 1 { count = 4 } @@ -980,11 +979,14 @@ func (pt *peerTaskConductor) waitFirstPeerPacket(done chan bool) { } } -func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[DownloadPieceRequest]) { +func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests PieceDispatcher) { for { - request, ok := requests.Dequeue() - if !ok { - pt.Infof("piece download queue cancelled, peer download worker #%d exit", id) + request, err := requests.Get() + if errors.Is(err, ErrNoValidPieceTemporarily) { + continue + } + if err != nil { + pt.Infof("piece download queue cancelled, peer download worker #%d exit, err: %v", id, err) return } pt.readyPiecesLock.RLock() @@ -994,7 +996,10 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[D continue } pt.readyPiecesLock.RUnlock() - pt.downloadPiece(id, request) + result := pt.downloadPiece(id, request) + if result != nil { + requests.Report(result) + } select { case <-pt.pieceDownloadCtx.Done(): pt.Infof("piece download cancelled, peer download worker #%d exit", id) @@ -1010,14 +1015,14 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[D } } -func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) { +func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPieceRequest) *DownloadPieceResult { // only downloading piece in one worker at same time pt.runningPiecesLock.Lock() if pt.runningPieces.IsSet(request.piece.PieceNum) { pt.runningPiecesLock.Unlock() pt.Log().Debugf("piece %d is downloading, skip", request.piece.PieceNum) // TODO save to queue for failed pieces - return + return nil } pt.runningPieces.Set(request.piece.PieceNum) pt.runningPiecesLock.Unlock() @@ -1036,7 +1041,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec if pt.limiter != nil && !pt.waitLimit(ctx, request) { span.SetAttributes(config.AttributePieceSuccess.Bool(false)) span.End() - return + return nil } pt.Debugf("peer download worker #%d receive piece task, "+ @@ -1051,7 +1056,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec span.End() if pt.needBackSource.Load() { pt.Infof("switch to back source, skip send failed piece") - return + return result } attempt, success := pt.pieceTaskSyncManager.acquire( &commonv1.PieceTaskRequest{ @@ -1062,7 +1067,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec }) pt.Infof("send failed piece %d to remote, attempt: %d, success: %d", request.piece.PieceNum, attempt, success) - return + return result } // broadcast success piece pt.reportSuccessResult(request, result) @@ -1070,6 +1075,7 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec span.SetAttributes(config.AttributePieceSuccess.Bool(true)) span.End() + return result } func (pt *peerTaskConductor) waitLimit(ctx context.Context, request *DownloadPieceRequest) bool { diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index 519f66503..5fea9b10f 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -36,7 +36,6 @@ import ( "d7y.io/dragonfly/v2/client/config" logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/container/ring" "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/net/ip" dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" @@ -47,7 +46,7 @@ type pieceTaskSyncManager struct { ctx context.Context ctxCancel context.CancelFunc peerTaskConductor *peerTaskConductor - pieceRequestQueue ring.Queue[DownloadPieceRequest] + pieceRequestQueue PieceDispatcher workers map[string]*pieceTaskSynchronizer watchdog *synchronizerWatchdog } @@ -64,7 +63,7 @@ type pieceTaskSynchronizer struct { grpcInitialized *atomic.Bool grpcInitError atomic.Value peerTaskConductor *peerTaskConductor - pieceRequestQueue ring.Queue[DownloadPieceRequest] + pieceRequestQueue PieceDispatcher } type synchronizerWatchdog struct { @@ -216,7 +215,7 @@ func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (atte s.RLock() for _, p := range s.workers { attempt++ - if p.acquire(request) == nil { + if p.grpcInitialized.Load() && p.acquire(request) == nil { success++ } } @@ -372,7 +371,7 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.Piece DstAddr: piecePacket.DstAddr, } - s.pieceRequestQueue.Enqueue(req) + s.pieceRequestQueue.Put(req) s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum)) select { diff --git a/client/daemon/peer/piece_dispatcher.go b/client/daemon/peer/piece_dispatcher.go new file mode 100644 index 000000000..4aeb4bd14 --- /dev/null +++ b/client/daemon/peer/piece_dispatcher.go @@ -0,0 +1,173 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package peer + +import ( + "errors" + "math/rand" + "sync" + "time" + + "go.uber.org/atomic" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" + + logger "d7y.io/dragonfly/v2/internal/dflog" +) + +type PieceDispatcher interface { + // Put pieceSynchronizer put piece request into PieceDispatcher + Put(req *DownloadPieceRequest) + // Get downloader will get piece request from PieceDispatcher + Get() (req *DownloadPieceRequest, err error) + // Report downloader will report piece download result to PieceDispatcher, so PieceDispatcher can score peers + Report(result *DownloadPieceResult) + // Close related resources, and not accept Put and Get anymore + Close() +} + +var ErrNoValidPieceTemporarily = errors.New("no valid piece temporarily") + +type pieceDispatcher struct { + // peerRequests hold piece requests of peers. Key is PeerID, value is piece requests + peerRequests map[string][]*DownloadPieceRequest + // score hold the score of each peer. + score map[string]int64 + // downloaded hold the already successfully downloaded piece num + downloaded map[int32]struct{} + // sum is the valid num of piece requests. When sum == 0, the consumer will wait until there is a request is putted + sum *atomic.Int64 + closed bool + cond *sync.Cond + lock *sync.Mutex + log *logger.SugaredLoggerOnWith + randomRatio float64 + // rand is not thread-safe + rand *rand.Rand +} + +var ( + // the lower, the better + maxScore = int64(0) + minScore = (60 * time.Second).Nanoseconds() +) + +func NewPieceDispatcher(randomRatio float64, log *logger.SugaredLoggerOnWith) PieceDispatcher { + lock := &sync.Mutex{} + pd := &pieceDispatcher{ + peerRequests: map[string][]*DownloadPieceRequest{}, + score: map[string]int64{}, + downloaded: map[int32]struct{}{}, + sum: atomic.NewInt64(0), + closed: false, + cond: sync.NewCond(lock), + lock: lock, + log: log.With("component", "pieceDispatcher"), + randomRatio: randomRatio, + rand: rand.New(rand.NewSource(time.Now().Unix())), + } + log.Debugf("piece dispatcher created") + return pd +} + +func (p *pieceDispatcher) Put(req *DownloadPieceRequest) { + p.lock.Lock() + defer p.lock.Unlock() + if reqs, ok := p.peerRequests[req.DstPid]; ok { + p.peerRequests[req.DstPid] = append(reqs, req) + } else { + p.peerRequests[req.DstPid] = []*DownloadPieceRequest{req} + } + if _, ok := p.score[req.DstPid]; !ok { + p.score[req.DstPid] = maxScore + } + p.sum.Add(1) + p.cond.Broadcast() +} + +func (p *pieceDispatcher) Get() (req *DownloadPieceRequest, err error) { + p.lock.Lock() + defer p.lock.Unlock() + for p.sum.Load() == 0 && !p.closed { + p.cond.Wait() + } + if p.closed { + return nil, errors.New("piece dispatcher already closed") + } + return p.getDesiredReq() +} + +// getDesiredReq return a req according to performance of each dest peer. It is not thread-safe +func (p *pieceDispatcher) getDesiredReq() (*DownloadPieceRequest, error) { + distPeerIDs := maps.Keys(p.score) + if p.rand.Float64() < p.randomRatio { //random shuffle with the probability of randomRatio + p.rand.Shuffle(len(distPeerIDs), func(i, j int) { + tmp := distPeerIDs[j] + distPeerIDs[j] = distPeerIDs[i] + distPeerIDs[i] = tmp + }) + } else { // sort by score with the probability of (1-randomRatio) + slices.SortFunc(distPeerIDs, func(p1, p2 string) bool { return p.score[p1] < p.score[p2] }) + } + + // iterate all peers, until get a valid piece requests + for _, peer := range distPeerIDs { + for len(p.peerRequests[peer]) > 0 { + // choose a random piece request of a peer + n := p.rand.Intn(len(p.peerRequests[peer])) + req := p.peerRequests[peer][n] + p.peerRequests[peer] = append(p.peerRequests[peer][0:n], p.peerRequests[peer][n+1:]...) + p.sum.Sub(1) + if _, ok := p.downloaded[req.piece.PieceNum]; ok { //already downloaded, skip + // p.log.Debugf("skip already downloaded piece , peer: %s, piece:%d", peer, req.piece.PieceNum) + continue + } + // p.log.Debugf("scores :%v, select :%s, piece:%v", p.score, peer, req.piece.PieceNum) + return req, nil + } + } + return nil, ErrNoValidPieceTemporarily +} + +// Report pieceDispatcher will score peer according to the download result reported by downloader +// The score of peer is not determined only by last piece downloaded, it is smoothed. +func (p *pieceDispatcher) Report(result *DownloadPieceResult) { + p.lock.Lock() + defer p.lock.Unlock() + if result == nil || result.DstPeerID == "" { + return + } + lastScore := p.score[result.DstPeerID] + if result.Fail { + p.score[result.DstPeerID] = (lastScore + minScore) / 2 + } else { + if result.pieceInfo != nil { + p.downloaded[result.pieceInfo.PieceNum] = struct{}{} + } + p.score[result.DstPeerID] = (lastScore + result.FinishTime - result.BeginTime) / 2 + } + return +} + +func (p *pieceDispatcher) Close() { + p.lock.Lock() + p.closed = true + p.cond.Broadcast() + p.log.Debugf("piece dispatcher closed") + p.lock.Unlock() +} diff --git a/client/daemon/peer/piece_dispatcher_test.go b/client/daemon/peer/piece_dispatcher_test.go new file mode 100644 index 000000000..e510224e7 --- /dev/null +++ b/client/daemon/peer/piece_dispatcher_test.go @@ -0,0 +1,248 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package peer + +import ( + "math/rand" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/exp/maps" + "golang.org/x/exp/slices" + + commonv1 "d7y.io/api/pkg/apis/common/v1" + + logger "d7y.io/dragonfly/v2/internal/dflog" +) + +type peerDesc struct { + id string + uploadTime time.Duration + count *int +} + +type pieceTestManager struct { + pieceDispatcher PieceDispatcher + peers map[string]peerDesc + pieceNum int +} + +func newPieceTestManager(pieceDispatcher PieceDispatcher, peers []peerDesc, pieceNum int) *pieceTestManager { + peerMap := make(map[string]peerDesc) + for _, p := range peers { + peerMap[p.id] = peerDesc{ + id: p.id, + uploadTime: p.uploadTime, + count: new(int), + } + } + pm := &pieceTestManager{ + pieceDispatcher: pieceDispatcher, + peers: peerMap, + pieceNum: pieceNum, + } + return pm +} + +func (pc *pieceTestManager) Run() { + wg := &sync.WaitGroup{} + wg.Add(1) + // producer + go func() { + slice := make([]*DownloadPieceRequest, 0) + for i := 0; i < 4; i++ { + for _, peer := range pc.peers { + for j := 0; j < pc.pieceNum; j++ { + slice = append(slice, &DownloadPieceRequest{ + piece: &commonv1.PieceInfo{PieceNum: int32(j)}, + DstPid: peer.id, + }) + } + } + } + rand.Shuffle(len(slice), func(i, j int) { + tmp := slice[i] + slice[i] = slice[j] + slice[j] = tmp + }) + for _, req := range slice { + pc.pieceDispatcher.Put(req) + } + wg.Done() + }() + + // consumer + wg.Add(1) + go func() { + downloaded := map[int32]struct{}{} + for { + req, err := pc.pieceDispatcher.Get() + if err == ErrNoValidPieceTemporarily { + continue + } + if err != nil { + break + } + *pc.peers[req.DstPid].count = *pc.peers[req.DstPid].count + 1 + downloaded[req.piece.PieceNum] = struct{}{} + pc.pieceDispatcher.Report(&DownloadPieceResult{ + pieceInfo: req.piece, + DstPeerID: req.DstPid, + BeginTime: time.Now().UnixNano(), + FinishTime: time.Now().Add(pc.peers[req.DstPid].uploadTime).UnixNano(), + }) + if len(downloaded) >= pc.pieceNum { + break + } + } + pc.pieceDispatcher.Close() + wg.Done() + }() + wg.Wait() +} + +// return a slice of peerIDs, order by count desc +func (pc *pieceTestManager) Order() []string { + peerIDs := maps.Keys(pc.peers) + slices.SortFunc(peerIDs, func(a, b string) bool { return *pc.peers[a].count > *pc.peers[b].count }) + return peerIDs +} + +func TestPieceDispatcher(t *testing.T) { + type args struct { + randomRatio float64 + peers []peerDesc + pieceNum int + } + tests := []struct { + name string + args args + want []string + }{ + { + name: "no random", + args: args{ + randomRatio: 0, + peers: []peerDesc{ + {"bad", time.Second * 10, nil}, + {"good", time.Second * 2, nil}, + }, + pieceNum: 10000, + }, + want: []string{"good", "bad"}, + }, + { + name: "with 0.5 randomRatio", + args: args{ + randomRatio: 0.5, + peers: []peerDesc{ + {"bad", time.Second * 10, nil}, + {"good", time.Second * 2, nil}, + }, + pieceNum: 10000, + }, + want: []string{"good", "bad"}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pieceDispatcher := NewPieceDispatcher(tt.args.randomRatio, logger.With()) + pieceTestManager := newPieceTestManager(pieceDispatcher, tt.args.peers, tt.args.pieceNum) + pieceTestManager.Run() + result := pieceTestManager.Order() + assert.Equal(t, tt.want, result) + }) + } +} + +func TestPieceDispatcherCount(t *testing.T) { + type args struct { + randomRatio float64 + peers []peerDesc + pieceNum int + } + tests := []struct { + name string + args args + want map[string]int + }{ + { + name: "no random", + args: args{ + randomRatio: 0, + peers: []peerDesc{ + {"bad", time.Second * 4, nil}, + {"mid", time.Second * 3, nil}, + {"good", time.Second * 2, nil}, + }, + pieceNum: 10000, + }, + want: map[string]int{ + "bad": 0, + "mid": 0, + "good": 8000, + }, + }, + { + name: "with 0.5 randomRatio", + args: args{ + randomRatio: 0.5, + peers: []peerDesc{ + {"bad", time.Second * 4, nil}, + {"mid", time.Second * 3, nil}, + {"good", time.Second * 2, nil}, + }, + pieceNum: 10000, + }, + want: map[string]int{ + "bad": 1000, + "mid": 1000, + "good": 6000, + }, + }, + { + name: "total random", + args: args{ + randomRatio: 1, + peers: []peerDesc{ + {"bad", time.Second * 10, nil}, + {"good", time.Second * 2, nil}, + }, + pieceNum: 10000, + }, + want: map[string]int{ + "bad": 4000, + "good": 4000, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + pieceDispatcher := NewPieceDispatcher(tt.args.randomRatio, logger.With()) + pieceTestManager := newPieceTestManager(pieceDispatcher, tt.args.peers, tt.args.pieceNum) + pieceTestManager.Run() + for p, c := range tt.want { + if *pieceTestManager.peers[p].count < c { + t.Errorf("peer %s should receive more pieces than %d, however get %d", p, c, *pieceTestManager.peers[p].count) + t.Fail() + } + } + }) + } +} diff --git a/client/daemon/peer/piece_downloader.go b/client/daemon/peer/piece_downloader.go index 8324ad572..27189a41d 100644 --- a/client/daemon/peer/piece_downloader.go +++ b/client/daemon/peer/piece_downloader.go @@ -59,6 +59,9 @@ type DownloadPieceResult struct { BeginTime int64 // FinishTime nanosecond FinishTime int64 + DstPeerID string + Fail bool + pieceInfo *commonv1.PieceInfo } type PieceDownloader interface { diff --git a/client/daemon/peer/piece_manager.go b/client/daemon/peer/piece_manager.go index 23d521b11..aa35e0cfc 100644 --- a/client/daemon/peer/piece_manager.go +++ b/client/daemon/peer/piece_manager.go @@ -171,6 +171,9 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec Size: -1, BeginTime: time.Now().UnixNano(), FinishTime: 0, + DstPeerID: request.DstPid, + Fail: false, + pieceInfo: request.piece, } // prepare trace and limit @@ -179,6 +182,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec if pm.Limiter != nil { if err := pm.Limiter.WaitN(ctx, int(request.piece.RangeSize)); err != nil { result.FinishTime = time.Now().UnixNano() + result.Fail = true request.log.Errorf("require rate limit access error: %s", err) return result, err } @@ -192,6 +196,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec r, c, err := pm.pieceDownloader.DownloadPiece(ctx, request) if err != nil { result.FinishTime = time.Now().UnixNano() + result.Fail = true span.RecordError(err) request.log.Errorf("download piece failed, piece num: %d, error: %s, from peer: %s", request.piece.PieceNum, err, request.DstPid) @@ -222,6 +227,7 @@ func (pm *pieceManager) DownloadPiece(ctx context.Context, request *DownloadPiec span.RecordError(err) if err != nil { + result.Fail = true request.log.Errorf("put piece to storage failed, piece num: %d, wrote: %d, error: %s", request.piece.PieceNum, result.Size, err) return result, err @@ -237,6 +243,7 @@ func (pm *pieceManager) processPieceFromSource(pt Task, Size: -1, BeginTime: time.Now().UnixNano(), FinishTime: 0, + DstPeerID: "", } var (