From ff79c3148a5322aeea082af3a74a47f9542f26c3 Mon Sep 17 00:00:00 2001 From: Jim Ma Date: Fri, 16 Dec 2022 16:01:00 +0800 Subject: [PATCH] feat: random pieces download (#1918) Signed-off-by: Jim Ma --- client/config/constants.go | 5 +- client/daemon/peer/peertask_conductor.go | 57 +++-- .../peer/peertask_piecetask_synchronizer.go | 17 +- pkg/container/ring/queue.go | 23 ++ pkg/container/ring/queue_test.go | 204 ++++++++++++++++++ pkg/container/ring/random.go | 88 ++++++++ pkg/container/ring/sequence.go | 115 ++++++++++ 7 files changed, 478 insertions(+), 31 deletions(-) create mode 100644 pkg/container/ring/queue.go create mode 100644 pkg/container/ring/queue_test.go create mode 100644 pkg/container/ring/random.go create mode 100644 pkg/container/ring/sequence.go diff --git a/client/config/constants.go b/client/config/constants.go index 1c4ef61fc..50b26d711 100644 --- a/client/config/constants.go +++ b/client/config/constants.go @@ -61,8 +61,9 @@ const ( DefaultSchedulerIP = "127.0.0.1" DefaultSchedulerPort = 8002 - DefaultPieceChanSize = 16 - DefaultObjectMaxReplicas = 3 + DefaultPieceChanSize = 16 + DefaultPieceQueueExponent = 4 + DefaultObjectMaxReplicas = 3 ) // Store strategy. diff --git a/client/daemon/peer/peertask_conductor.go b/client/daemon/peer/peertask_conductor.go index 3330fd6f5..5f96dc383 100644 --- a/client/daemon/peer/peertask_conductor.go +++ b/client/daemon/peer/peertask_conductor.go @@ -44,6 +44,7 @@ import ( "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/internal/dferrors" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/ring" "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc/common" @@ -581,8 +582,7 @@ func (pt *peerTaskConductor) pullPieces() { func (pt *peerTaskConductor) pullPiecesWithP2P() { var ( // keep same size with pt.failedPieceCh for avoiding deadlock - pieceBufferSize = uint32(config.DefaultPieceChanSize) - pieceRequestCh = make(chan *DownloadPieceRequest, pieceBufferSize) + pieceRequestQueue = ring.NewRandom[DownloadPieceRequest](config.DefaultPieceQueueExponent) ) ctx, cancel := context.WithCancel(pt.ctx) @@ -590,11 +590,11 @@ func (pt *peerTaskConductor) pullPiecesWithP2P() { ctx: ctx, ctxCancel: cancel, peerTaskConductor: pt, - pieceRequestCh: pieceRequestCh, + pieceRequestQueue: pieceRequestQueue, workers: map[string]*pieceTaskSynchronizer{}, } - go pt.pullPiecesFromPeers(pieceRequestCh) - pt.receivePeerPacket(pieceRequestCh) + go pt.pullPiecesFromPeers(pieceRequestQueue) + pt.receivePeerPacket(pieceRequestQueue) } func (pt *peerTaskConductor) storeEmptyPeerTask() { @@ -695,7 +695,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() { pt.PublishPieceInfo(0, uint32(contentLength)) } -func (pt *peerTaskConductor) receivePeerPacket(pieceRequestCh chan *DownloadPieceRequest) { +func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[DownloadPieceRequest]) { var ( lastNotReadyPiece int32 = 0 peerPacket *schedulerv1.PeerPacket @@ -785,7 +785,7 @@ loop: trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))) if !firstPacketReceived { - pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestCh) + pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestQueue) firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)) firstPeerSpan.End() } @@ -956,7 +956,7 @@ func (pt *peerTaskConductor) pullSinglePiece() { } // Deprecated -func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestCh chan *DownloadPieceRequest) { +func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestQueue ring.Queue[DownloadPieceRequest]) { if ok, backSource := pt.waitFirstPeerPacket(); !ok { if backSource { return @@ -1025,7 +1025,7 @@ loop: pt.updateMetadata(piecePacket) // 3. dispatch piece request to all workers - pt.dispatchPieceRequest(pieceRequestCh, piecePacket) + pt.dispatchPieceRequest(pieceRequestQueue, piecePacket) // 4. get next not request piece if num, ok = pt.getNextPieceNum(num); ok { @@ -1085,12 +1085,12 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) { } } -func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestCh chan *DownloadPieceRequest) { +func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue ring.Queue[DownloadPieceRequest]) { if count < 1 { count = 4 } for i := int32(0); i < count; i++ { - go pt.downloadPieceWorker(i, pieceRequestCh) + go pt.downloadPieceWorker(i, pieceRequestQueue) } } @@ -1156,7 +1156,7 @@ func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) { } // Deprecated -func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *commonv1.PiecePacket) { +func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestQueue ring.Queue[DownloadPieceRequest], piecePacket *commonv1.PiecePacket) { pieceCount := len(piecePacket.PieceInfos) pt.Debugf("dispatch piece request, piece count: %d", pieceCount) // fix cdn return zero piece info, but with total piece count and content length @@ -1184,12 +1184,18 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP DstPid: piecePacket.DstPid, DstAddr: piecePacket.DstAddr, } + + pieceRequestQueue.Enqueue(req) + msg := fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum) + pt.span.AddEvent(msg) + pt.Infof(msg) + select { - case pieceRequestCh <- req: case <-pt.successCh: pt.Infof("peer task success, stop dispatch piece request") case <-pt.failCh: pt.Warnf("peer task fail, stop dispatch piece request") + default: } } } @@ -1224,18 +1230,22 @@ wait: } } -func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *DownloadPieceRequest) { +func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[DownloadPieceRequest]) { for { - select { - case request := <-requests: - pt.readyPiecesLock.RLock() - if pt.readyPieces.IsSet(request.piece.PieceNum) { - pt.readyPiecesLock.RUnlock() - pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum) - continue - } + request, ok := requests.Dequeue() + if !ok { + pt.Infof("piece download queue cancelled, peer download worker #%d exit", id) + return + } + pt.readyPiecesLock.RLock() + if pt.readyPieces.IsSet(request.piece.PieceNum) { pt.readyPiecesLock.RUnlock() - pt.downloadPiece(id, request) + pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum) + continue + } + pt.readyPiecesLock.RUnlock() + pt.downloadPiece(id, request) + select { case <-pt.pieceDownloadCtx.Done(): pt.Infof("piece download cancelled, peer download worker #%d exit", id) return @@ -1245,6 +1255,7 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo case <-pt.failCh: pt.Errorf("peer task fail, peer download worker #%d exit", id) return + default: } } } diff --git a/client/daemon/peer/peertask_piecetask_synchronizer.go b/client/daemon/peer/peertask_piecetask_synchronizer.go index a626d8c47..87a77176b 100644 --- a/client/daemon/peer/peertask_piecetask_synchronizer.go +++ b/client/daemon/peer/peertask_piecetask_synchronizer.go @@ -36,6 +36,7 @@ import ( "d7y.io/dragonfly/v2/client/config" logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/container/ring" "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/net/ip" dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client" @@ -46,7 +47,7 @@ type pieceTaskSyncManager struct { ctx context.Context ctxCancel context.CancelFunc peerTaskConductor *peerTaskConductor - pieceRequestCh chan *DownloadPieceRequest + pieceRequestQueue ring.Queue[DownloadPieceRequest] workers map[string]*pieceTaskSynchronizer watchdog *synchronizerWatchdog } @@ -59,7 +60,7 @@ type pieceTaskSynchronizer struct { dstPeer *schedulerv1.PeerPacket_DestPeer error atomic.Value peerTaskConductor *peerTaskConductor - pieceRequestCh chan *DownloadPieceRequest + pieceRequestQueue ring.Queue[DownloadPieceRequest] } type synchronizerWatchdog struct { @@ -156,7 +157,7 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( Limit: 16, } if worker, ok := s.workers[dstPeer.PeerId]; ok { - // worker is okay, keep it go on + // worker is okay, keep it going on if worker.error.Load() == nil { s.peerTaskConductor.Infof("reuse PieceTaskSynchronizer %s", dstPeer.PeerId) return nil @@ -211,7 +212,7 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer( synchronizer := &pieceTaskSynchronizer{ span: span, peerTaskConductor: s.peerTaskConductor, - pieceRequestCh: s.pieceRequestCh, + pieceRequestQueue: s.pieceRequestQueue, syncPiecesStream: stream, grpcClient: grpcClient, dstPeer: dstPeer, @@ -318,6 +319,7 @@ func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (atte func (s *pieceTaskSyncManager) cancel() { s.ctxCancel() + s.pieceRequestQueue.Close() s.Lock() for _, p := range s.workers { p.close() @@ -371,13 +373,16 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.Piece DstPid: piecePacket.DstPid, DstAddr: piecePacket.DstAddr, } + + s.pieceRequestQueue.Enqueue(req) + s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum)) + select { - case s.pieceRequestCh <- req: - s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum)) case <-s.peerTaskConductor.successCh: s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) case <-s.peerTaskConductor.failCh: s.Warnf("peer task fail, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId) + default: } } } diff --git a/pkg/container/ring/queue.go b/pkg/container/ring/queue.go new file mode 100644 index 000000000..d6c55a5c1 --- /dev/null +++ b/pkg/container/ring/queue.go @@ -0,0 +1,23 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ring + +type Queue[T any] interface { + Enqueue(value *T) + Dequeue() (value *T, ok bool) + Close() +} diff --git a/pkg/container/ring/queue_test.go b/pkg/container/ring/queue_test.go new file mode 100644 index 000000000..03487a6f6 --- /dev/null +++ b/pkg/container/ring/queue_test.go @@ -0,0 +1,204 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ring + +import ( + "testing" + + testifyassert "github.com/stretchr/testify/assert" + "golang.org/x/exp/slices" +) + +func TestSequence(t *testing.T) { + var testCases = []struct { + name string + exponent int + values []int + }{ + { + name: "sequence with exponent 1", + exponent: 1, + values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "sequence with exponent 2", + exponent: 2, + values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "sequence with exponent 4", + exponent: 4, + values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "random with exponent 1", + exponent: 1, + values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0}, + }, + { + name: "random with exponent 2", + exponent: 2, + values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0}, + }, + { + name: "random with exponent 4", + exponent: 4, + values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert := testifyassert.New(t) + + q := NewSequence[int](tc.exponent) + go func() { + for _, v := range tc.values { + var vv int + vv = v + q.Enqueue(&vv) + } + }() + var values []int + for i := 0; i < len(tc.values); i++ { + val, ok := q.Dequeue() + assert.True(ok, "dequeue should be ok") + values = append(values, *val) + } + assert.Equal(tc.values, values) + + q.Close() + _, ok := q.Dequeue() + assert.False(ok, "dequeue after closed should be false") + }) + } +} + +func TestRandom(t *testing.T) { + var testCases = []struct { + name string + exponent int + values []int + }{ + { + name: "exponent 1 - case 1", + exponent: 1, + values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "exponent 2 - case 1", + exponent: 2, + values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "exponent 4 - case 1", + exponent: 4, + values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, + }, + { + name: "exponent 1 - case 2", + exponent: 1, + values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0}, + }, + { + name: "exponent 2 - case 2", + exponent: 2, + values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0}, + }, + { + name: "exponent 4 - case 2", + exponent: 4, + values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + assert := testifyassert.New(t) + + q := NewRandom[int](tc.exponent) + go func() { + for _, v := range tc.values { + var vv int + vv = v + q.Enqueue(&vv) + } + }() + var values []int + for i := 0; i < len(tc.values); i++ { + val, ok := q.Dequeue() + assert.True(ok, "dequeue should be ok") + values = append(values, *val) + } + slices.Sort(tc.values) + slices.Sort(values) + assert.Equal(tc.values, values) + + q.Close() + _, ok := q.Dequeue() + assert.False(ok, "dequeue after closed should be false") + }) + } +} + +func benchmarkRandom(b *testing.B, exponent int, input, output int) { + queue := NewRandom[int](exponent) + done := false + for i := 0; i < input; i++ { + go func(i int) { + for { + if done { + return + } + queue.Enqueue(&i) + } + }(i) + } + for i := 0; i < b.N; i++ { + queue.Dequeue() + } + queue.Close() + done = true +} + +func BenchmarkRandomExpo1Input2(b *testing.B) { + benchmarkRandom(b, 1, 2, 0) +} +func BenchmarkRandomExpo1Input4(b *testing.B) { + benchmarkRandom(b, 1, 4, 0) +} +func BenchmarkRandomExpo1Input8(b *testing.B) { + benchmarkRandom(b, 1, 8, 0) +} +func BenchmarkRandomExpo2Input2(b *testing.B) { + benchmarkRandom(b, 2, 2, 0) +} +func BenchmarkRandomExpo2Input4(b *testing.B) { + benchmarkRandom(b, 2, 4, 0) +} +func BenchmarkRandomExpo2Input8(b *testing.B) { + benchmarkRandom(b, 2, 8, 0) +} +func BenchmarkRandomExpo3Input2(b *testing.B) { + benchmarkRandom(b, 3, 2, 0) +} +func BenchmarkRandomExpo3Input4(b *testing.B) { + benchmarkRandom(b, 3, 4, 0) +} +func BenchmarkRandomExpo3Input8(b *testing.B) { + benchmarkRandom(b, 3, 8, 0) +} diff --git a/pkg/container/ring/random.go b/pkg/container/ring/random.go new file mode 100644 index 000000000..2c826ba01 --- /dev/null +++ b/pkg/container/ring/random.go @@ -0,0 +1,88 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ring + +import ( + "math/rand" + "time" +) + +type random[T any] struct { + *sequence[T] + seed *rand.Rand +} + +func NewRandom[T any](exponent int) Queue[T] { + return &random[T]{ + sequence: newSequence[T](exponent), + seed: rand.New(rand.NewSource(time.Now().UnixNano())), + } +} + +func (r *random[T]) Enqueue(value *T) { + r.sequence.Enqueue(value) +} + +func (r *random[T]) Dequeue() (value *T, ok bool) { + r.locker.Lock() +entry: + if r.closed { + r.locker.Unlock() + return nil, false + } + + if r.isEmpty() { + r.deqCond.Wait() + goto entry + } + + var ( + count uint64 + newHead = (r.head + 1) & r.mask + ) + + if r.head < r.tail { + count = r.tail - r.head + } else { + count = r.mask - (r.head - r.tail) + 1 + } + + if count > 1 { + // generate a random index + idx := r.seed.Int63n(int64(count)) + randomHead := (newHead + uint64(idx)) & r.mask + // skip same idx with newHeader + if idx > 0 { + // swap the new idx and newHead + var tmp *T + tmp = r.queue[randomHead] + r.queue[randomHead] = r.queue[newHead] + r.queue[newHead] = tmp + } + } + + r.head = newHead + val := r.queue[newHead] + + r.enqCond.Signal() + r.locker.Unlock() + return val, true +} + +func (r *random[T]) Close() { + r.sequence.Close() +} diff --git a/pkg/container/ring/sequence.go b/pkg/container/ring/sequence.go new file mode 100644 index 000000000..c2eea8603 --- /dev/null +++ b/pkg/container/ring/sequence.go @@ -0,0 +1,115 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package ring + +import ( + "math" + "sync" +) + +type sequence[T any] struct { + locker sync.Locker + enqCond *sync.Cond + deqCond *sync.Cond + closed bool + + queue []*T + head, tail uint64 + cap, mask uint64 +} + +func NewSequence[T any](exponent int) Queue[T] { + return newSequence[T](exponent) +} + +func newSequence[T any](exponent int) *sequence[T] { + capacity := uint64(math.Exp2(float64(exponent))) + locker := &sync.Mutex{} + return &sequence[T]{ + locker: locker, + enqCond: &sync.Cond{ + L: locker, + }, + deqCond: &sync.Cond{ + L: locker, + }, + queue: make([]*T, capacity), + head: uint64(0), + tail: uint64(0), + cap: capacity, + mask: capacity - 1, + } +} + +func (seq *sequence[T]) Enqueue(value *T) { + seq.locker.Lock() + +entry: + if seq.closed { + seq.locker.Unlock() + return + } + + if seq.isFull() { + seq.enqCond.Wait() + goto entry + } + + newTail := (seq.tail + 1) & seq.mask + seq.tail = newTail + seq.queue[newTail] = value + + seq.deqCond.Signal() + seq.locker.Unlock() +} + +func (seq *sequence[T]) Dequeue() (value *T, ok bool) { + seq.locker.Lock() +entry: + if seq.closed { + seq.locker.Unlock() + return nil, false + } + + if seq.isEmpty() { + seq.deqCond.Wait() + goto entry + } + newHead := (seq.head + 1) & seq.mask + seq.head = newHead + val := seq.queue[newHead] + + seq.enqCond.Signal() + seq.locker.Unlock() + return val, true +} + +func (seq *sequence[T]) Close() { + seq.locker.Lock() + seq.closed = true + seq.enqCond.Broadcast() + seq.deqCond.Broadcast() + seq.locker.Unlock() +} + +func (seq *sequence[T]) isEmpty() bool { + return seq.head == seq.tail +} + +func (seq *sequence[T]) isFull() bool { + return seq.tail-seq.head == seq.cap-1 || seq.head-seq.tail == 1 +}