feat: random pieces download (#1918)
Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
parent
9e79d140db
commit
ff79c3148a
|
|
@ -61,8 +61,9 @@ const (
|
|||
DefaultSchedulerIP = "127.0.0.1"
|
||||
DefaultSchedulerPort = 8002
|
||||
|
||||
DefaultPieceChanSize = 16
|
||||
DefaultObjectMaxReplicas = 3
|
||||
DefaultPieceChanSize = 16
|
||||
DefaultPieceQueueExponent = 4
|
||||
DefaultObjectMaxReplicas = 3
|
||||
)
|
||||
|
||||
// Store strategy.
|
||||
|
|
|
|||
|
|
@ -44,6 +44,7 @@ import (
|
|||
"d7y.io/dragonfly/v2/client/util"
|
||||
"d7y.io/dragonfly/v2/internal/dferrors"
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/container/ring"
|
||||
"d7y.io/dragonfly/v2/pkg/digest"
|
||||
"d7y.io/dragonfly/v2/pkg/idgen"
|
||||
"d7y.io/dragonfly/v2/pkg/rpc/common"
|
||||
|
|
@ -581,8 +582,7 @@ func (pt *peerTaskConductor) pullPieces() {
|
|||
func (pt *peerTaskConductor) pullPiecesWithP2P() {
|
||||
var (
|
||||
// keep same size with pt.failedPieceCh for avoiding deadlock
|
||||
pieceBufferSize = uint32(config.DefaultPieceChanSize)
|
||||
pieceRequestCh = make(chan *DownloadPieceRequest, pieceBufferSize)
|
||||
pieceRequestQueue = ring.NewRandom[DownloadPieceRequest](config.DefaultPieceQueueExponent)
|
||||
)
|
||||
ctx, cancel := context.WithCancel(pt.ctx)
|
||||
|
||||
|
|
@ -590,11 +590,11 @@ func (pt *peerTaskConductor) pullPiecesWithP2P() {
|
|||
ctx: ctx,
|
||||
ctxCancel: cancel,
|
||||
peerTaskConductor: pt,
|
||||
pieceRequestCh: pieceRequestCh,
|
||||
pieceRequestQueue: pieceRequestQueue,
|
||||
workers: map[string]*pieceTaskSynchronizer{},
|
||||
}
|
||||
go pt.pullPiecesFromPeers(pieceRequestCh)
|
||||
pt.receivePeerPacket(pieceRequestCh)
|
||||
go pt.pullPiecesFromPeers(pieceRequestQueue)
|
||||
pt.receivePeerPacket(pieceRequestQueue)
|
||||
}
|
||||
|
||||
func (pt *peerTaskConductor) storeEmptyPeerTask() {
|
||||
|
|
@ -695,7 +695,7 @@ func (pt *peerTaskConductor) storeTinyPeerTask() {
|
|||
pt.PublishPieceInfo(0, uint32(contentLength))
|
||||
}
|
||||
|
||||
func (pt *peerTaskConductor) receivePeerPacket(pieceRequestCh chan *DownloadPieceRequest) {
|
||||
func (pt *peerTaskConductor) receivePeerPacket(pieceRequestQueue ring.Queue[DownloadPieceRequest]) {
|
||||
var (
|
||||
lastNotReadyPiece int32 = 0
|
||||
peerPacket *schedulerv1.PeerPacket
|
||||
|
|
@ -785,7 +785,7 @@ loop:
|
|||
trace.WithAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId)))
|
||||
|
||||
if !firstPacketReceived {
|
||||
pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestCh)
|
||||
pt.initDownloadPieceWorkers(peerPacket.ParallelCount, pieceRequestQueue)
|
||||
firstPeerSpan.SetAttributes(config.AttributeMainPeer.String(peerPacket.MainPeer.PeerId))
|
||||
firstPeerSpan.End()
|
||||
}
|
||||
|
|
@ -956,7 +956,7 @@ func (pt *peerTaskConductor) pullSinglePiece() {
|
|||
}
|
||||
|
||||
// Deprecated
|
||||
func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestCh chan *DownloadPieceRequest) {
|
||||
func (pt *peerTaskConductor) pullPiecesFromPeers(pieceRequestQueue ring.Queue[DownloadPieceRequest]) {
|
||||
if ok, backSource := pt.waitFirstPeerPacket(); !ok {
|
||||
if backSource {
|
||||
return
|
||||
|
|
@ -1025,7 +1025,7 @@ loop:
|
|||
pt.updateMetadata(piecePacket)
|
||||
|
||||
// 3. dispatch piece request to all workers
|
||||
pt.dispatchPieceRequest(pieceRequestCh, piecePacket)
|
||||
pt.dispatchPieceRequest(pieceRequestQueue, piecePacket)
|
||||
|
||||
// 4. get next not request piece
|
||||
if num, ok = pt.getNextPieceNum(num); ok {
|
||||
|
|
@ -1085,12 +1085,12 @@ func (pt *peerTaskConductor) updateMetadata(piecePacket *commonv1.PiecePacket) {
|
|||
}
|
||||
}
|
||||
|
||||
func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestCh chan *DownloadPieceRequest) {
|
||||
func (pt *peerTaskConductor) initDownloadPieceWorkers(count int32, pieceRequestQueue ring.Queue[DownloadPieceRequest]) {
|
||||
if count < 1 {
|
||||
count = 4
|
||||
}
|
||||
for i := int32(0); i < count; i++ {
|
||||
go pt.downloadPieceWorker(i, pieceRequestCh)
|
||||
go pt.downloadPieceWorker(i, pieceRequestQueue)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -1156,7 +1156,7 @@ func (pt *peerTaskConductor) waitAvailablePeerPacket() (int32, bool) {
|
|||
}
|
||||
|
||||
// Deprecated
|
||||
func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadPieceRequest, piecePacket *commonv1.PiecePacket) {
|
||||
func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestQueue ring.Queue[DownloadPieceRequest], piecePacket *commonv1.PiecePacket) {
|
||||
pieceCount := len(piecePacket.PieceInfos)
|
||||
pt.Debugf("dispatch piece request, piece count: %d", pieceCount)
|
||||
// fix cdn return zero piece info, but with total piece count and content length
|
||||
|
|
@ -1184,12 +1184,18 @@ func (pt *peerTaskConductor) dispatchPieceRequest(pieceRequestCh chan *DownloadP
|
|||
DstPid: piecePacket.DstPid,
|
||||
DstAddr: piecePacket.DstAddr,
|
||||
}
|
||||
|
||||
pieceRequestQueue.Enqueue(req)
|
||||
msg := fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum)
|
||||
pt.span.AddEvent(msg)
|
||||
pt.Infof(msg)
|
||||
|
||||
select {
|
||||
case pieceRequestCh <- req:
|
||||
case <-pt.successCh:
|
||||
pt.Infof("peer task success, stop dispatch piece request")
|
||||
case <-pt.failCh:
|
||||
pt.Warnf("peer task fail, stop dispatch piece request")
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1224,18 +1230,22 @@ wait:
|
|||
}
|
||||
}
|
||||
|
||||
func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *DownloadPieceRequest) {
|
||||
func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests ring.Queue[DownloadPieceRequest]) {
|
||||
for {
|
||||
select {
|
||||
case request := <-requests:
|
||||
pt.readyPiecesLock.RLock()
|
||||
if pt.readyPieces.IsSet(request.piece.PieceNum) {
|
||||
pt.readyPiecesLock.RUnlock()
|
||||
pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum)
|
||||
continue
|
||||
}
|
||||
request, ok := requests.Dequeue()
|
||||
if !ok {
|
||||
pt.Infof("piece download queue cancelled, peer download worker #%d exit", id)
|
||||
return
|
||||
}
|
||||
pt.readyPiecesLock.RLock()
|
||||
if pt.readyPieces.IsSet(request.piece.PieceNum) {
|
||||
pt.readyPiecesLock.RUnlock()
|
||||
pt.downloadPiece(id, request)
|
||||
pt.Log().Debugf("piece %d is already downloaded, skip", request.piece.PieceNum)
|
||||
continue
|
||||
}
|
||||
pt.readyPiecesLock.RUnlock()
|
||||
pt.downloadPiece(id, request)
|
||||
select {
|
||||
case <-pt.pieceDownloadCtx.Done():
|
||||
pt.Infof("piece download cancelled, peer download worker #%d exit", id)
|
||||
return
|
||||
|
|
@ -1245,6 +1255,7 @@ func (pt *peerTaskConductor) downloadPieceWorker(id int32, requests chan *Downlo
|
|||
case <-pt.failCh:
|
||||
pt.Errorf("peer task fail, peer download worker #%d exit", id)
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -36,6 +36,7 @@ import (
|
|||
|
||||
"d7y.io/dragonfly/v2/client/config"
|
||||
logger "d7y.io/dragonfly/v2/internal/dflog"
|
||||
"d7y.io/dragonfly/v2/pkg/container/ring"
|
||||
"d7y.io/dragonfly/v2/pkg/dfnet"
|
||||
"d7y.io/dragonfly/v2/pkg/net/ip"
|
||||
dfdaemonclient "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/client"
|
||||
|
|
@ -46,7 +47,7 @@ type pieceTaskSyncManager struct {
|
|||
ctx context.Context
|
||||
ctxCancel context.CancelFunc
|
||||
peerTaskConductor *peerTaskConductor
|
||||
pieceRequestCh chan *DownloadPieceRequest
|
||||
pieceRequestQueue ring.Queue[DownloadPieceRequest]
|
||||
workers map[string]*pieceTaskSynchronizer
|
||||
watchdog *synchronizerWatchdog
|
||||
}
|
||||
|
|
@ -59,7 +60,7 @@ type pieceTaskSynchronizer struct {
|
|||
dstPeer *schedulerv1.PeerPacket_DestPeer
|
||||
error atomic.Value
|
||||
peerTaskConductor *peerTaskConductor
|
||||
pieceRequestCh chan *DownloadPieceRequest
|
||||
pieceRequestQueue ring.Queue[DownloadPieceRequest]
|
||||
}
|
||||
|
||||
type synchronizerWatchdog struct {
|
||||
|
|
@ -156,7 +157,7 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
|
|||
Limit: 16,
|
||||
}
|
||||
if worker, ok := s.workers[dstPeer.PeerId]; ok {
|
||||
// worker is okay, keep it go on
|
||||
// worker is okay, keep it going on
|
||||
if worker.error.Load() == nil {
|
||||
s.peerTaskConductor.Infof("reuse PieceTaskSynchronizer %s", dstPeer.PeerId)
|
||||
return nil
|
||||
|
|
@ -211,7 +212,7 @@ func (s *pieceTaskSyncManager) newPieceTaskSynchronizer(
|
|||
synchronizer := &pieceTaskSynchronizer{
|
||||
span: span,
|
||||
peerTaskConductor: s.peerTaskConductor,
|
||||
pieceRequestCh: s.pieceRequestCh,
|
||||
pieceRequestQueue: s.pieceRequestQueue,
|
||||
syncPiecesStream: stream,
|
||||
grpcClient: grpcClient,
|
||||
dstPeer: dstPeer,
|
||||
|
|
@ -318,6 +319,7 @@ func (s *pieceTaskSyncManager) acquire(request *commonv1.PieceTaskRequest) (atte
|
|||
|
||||
func (s *pieceTaskSyncManager) cancel() {
|
||||
s.ctxCancel()
|
||||
s.pieceRequestQueue.Close()
|
||||
s.Lock()
|
||||
for _, p := range s.workers {
|
||||
p.close()
|
||||
|
|
@ -371,13 +373,16 @@ func (s *pieceTaskSynchronizer) dispatchPieceRequest(piecePacket *commonv1.Piece
|
|||
DstPid: piecePacket.DstPid,
|
||||
DstAddr: piecePacket.DstAddr,
|
||||
}
|
||||
|
||||
s.pieceRequestQueue.Enqueue(req)
|
||||
s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum))
|
||||
|
||||
select {
|
||||
case s.pieceRequestCh <- req:
|
||||
s.span.AddEvent(fmt.Sprintf("send piece #%d request to piece download queue", piece.PieceNum))
|
||||
case <-s.peerTaskConductor.successCh:
|
||||
s.Infof("peer task success, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
|
||||
case <-s.peerTaskConductor.failCh:
|
||||
s.Warnf("peer task fail, stop dispatch piece request, dest peer: %s", s.dstPeer.PeerId)
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,23 @@
|
|||
/*
|
||||
* Copyright 2022 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ring
|
||||
|
||||
type Queue[T any] interface {
|
||||
Enqueue(value *T)
|
||||
Dequeue() (value *T, ok bool)
|
||||
Close()
|
||||
}
|
||||
|
|
@ -0,0 +1,204 @@
|
|||
/*
|
||||
* Copyright 2022 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ring
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
testifyassert "github.com/stretchr/testify/assert"
|
||||
"golang.org/x/exp/slices"
|
||||
)
|
||||
|
||||
func TestSequence(t *testing.T) {
|
||||
var testCases = []struct {
|
||||
name string
|
||||
exponent int
|
||||
values []int
|
||||
}{
|
||||
{
|
||||
name: "sequence with exponent 1",
|
||||
exponent: 1,
|
||||
values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
name: "sequence with exponent 2",
|
||||
exponent: 2,
|
||||
values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
name: "sequence with exponent 4",
|
||||
exponent: 4,
|
||||
values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
name: "random with exponent 1",
|
||||
exponent: 1,
|
||||
values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0},
|
||||
},
|
||||
{
|
||||
name: "random with exponent 2",
|
||||
exponent: 2,
|
||||
values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0},
|
||||
},
|
||||
{
|
||||
name: "random with exponent 4",
|
||||
exponent: 4,
|
||||
values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert := testifyassert.New(t)
|
||||
|
||||
q := NewSequence[int](tc.exponent)
|
||||
go func() {
|
||||
for _, v := range tc.values {
|
||||
var vv int
|
||||
vv = v
|
||||
q.Enqueue(&vv)
|
||||
}
|
||||
}()
|
||||
var values []int
|
||||
for i := 0; i < len(tc.values); i++ {
|
||||
val, ok := q.Dequeue()
|
||||
assert.True(ok, "dequeue should be ok")
|
||||
values = append(values, *val)
|
||||
}
|
||||
assert.Equal(tc.values, values)
|
||||
|
||||
q.Close()
|
||||
_, ok := q.Dequeue()
|
||||
assert.False(ok, "dequeue after closed should be false")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRandom(t *testing.T) {
|
||||
var testCases = []struct {
|
||||
name string
|
||||
exponent int
|
||||
values []int
|
||||
}{
|
||||
{
|
||||
name: "exponent 1 - case 1",
|
||||
exponent: 1,
|
||||
values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
name: "exponent 2 - case 1",
|
||||
exponent: 2,
|
||||
values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
name: "exponent 4 - case 1",
|
||||
exponent: 4,
|
||||
values: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
|
||||
},
|
||||
{
|
||||
name: "exponent 1 - case 2",
|
||||
exponent: 1,
|
||||
values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0},
|
||||
},
|
||||
{
|
||||
name: "exponent 2 - case 2",
|
||||
exponent: 2,
|
||||
values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0},
|
||||
},
|
||||
{
|
||||
name: "exponent 4 - case 2",
|
||||
exponent: 4,
|
||||
values: []int{8, 5, 1, 3, 7, 4, 2, 6, 9, 0},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
assert := testifyassert.New(t)
|
||||
|
||||
q := NewRandom[int](tc.exponent)
|
||||
go func() {
|
||||
for _, v := range tc.values {
|
||||
var vv int
|
||||
vv = v
|
||||
q.Enqueue(&vv)
|
||||
}
|
||||
}()
|
||||
var values []int
|
||||
for i := 0; i < len(tc.values); i++ {
|
||||
val, ok := q.Dequeue()
|
||||
assert.True(ok, "dequeue should be ok")
|
||||
values = append(values, *val)
|
||||
}
|
||||
slices.Sort(tc.values)
|
||||
slices.Sort(values)
|
||||
assert.Equal(tc.values, values)
|
||||
|
||||
q.Close()
|
||||
_, ok := q.Dequeue()
|
||||
assert.False(ok, "dequeue after closed should be false")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkRandom(b *testing.B, exponent int, input, output int) {
|
||||
queue := NewRandom[int](exponent)
|
||||
done := false
|
||||
for i := 0; i < input; i++ {
|
||||
go func(i int) {
|
||||
for {
|
||||
if done {
|
||||
return
|
||||
}
|
||||
queue.Enqueue(&i)
|
||||
}
|
||||
}(i)
|
||||
}
|
||||
for i := 0; i < b.N; i++ {
|
||||
queue.Dequeue()
|
||||
}
|
||||
queue.Close()
|
||||
done = true
|
||||
}
|
||||
|
||||
func BenchmarkRandomExpo1Input2(b *testing.B) {
|
||||
benchmarkRandom(b, 1, 2, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo1Input4(b *testing.B) {
|
||||
benchmarkRandom(b, 1, 4, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo1Input8(b *testing.B) {
|
||||
benchmarkRandom(b, 1, 8, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo2Input2(b *testing.B) {
|
||||
benchmarkRandom(b, 2, 2, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo2Input4(b *testing.B) {
|
||||
benchmarkRandom(b, 2, 4, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo2Input8(b *testing.B) {
|
||||
benchmarkRandom(b, 2, 8, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo3Input2(b *testing.B) {
|
||||
benchmarkRandom(b, 3, 2, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo3Input4(b *testing.B) {
|
||||
benchmarkRandom(b, 3, 4, 0)
|
||||
}
|
||||
func BenchmarkRandomExpo3Input8(b *testing.B) {
|
||||
benchmarkRandom(b, 3, 8, 0)
|
||||
}
|
||||
|
|
@ -0,0 +1,88 @@
|
|||
/*
|
||||
* Copyright 2022 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ring
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"time"
|
||||
)
|
||||
|
||||
type random[T any] struct {
|
||||
*sequence[T]
|
||||
seed *rand.Rand
|
||||
}
|
||||
|
||||
func NewRandom[T any](exponent int) Queue[T] {
|
||||
return &random[T]{
|
||||
sequence: newSequence[T](exponent),
|
||||
seed: rand.New(rand.NewSource(time.Now().UnixNano())),
|
||||
}
|
||||
}
|
||||
|
||||
func (r *random[T]) Enqueue(value *T) {
|
||||
r.sequence.Enqueue(value)
|
||||
}
|
||||
|
||||
func (r *random[T]) Dequeue() (value *T, ok bool) {
|
||||
r.locker.Lock()
|
||||
entry:
|
||||
if r.closed {
|
||||
r.locker.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if r.isEmpty() {
|
||||
r.deqCond.Wait()
|
||||
goto entry
|
||||
}
|
||||
|
||||
var (
|
||||
count uint64
|
||||
newHead = (r.head + 1) & r.mask
|
||||
)
|
||||
|
||||
if r.head < r.tail {
|
||||
count = r.tail - r.head
|
||||
} else {
|
||||
count = r.mask - (r.head - r.tail) + 1
|
||||
}
|
||||
|
||||
if count > 1 {
|
||||
// generate a random index
|
||||
idx := r.seed.Int63n(int64(count))
|
||||
randomHead := (newHead + uint64(idx)) & r.mask
|
||||
// skip same idx with newHeader
|
||||
if idx > 0 {
|
||||
// swap the new idx and newHead
|
||||
var tmp *T
|
||||
tmp = r.queue[randomHead]
|
||||
r.queue[randomHead] = r.queue[newHead]
|
||||
r.queue[newHead] = tmp
|
||||
}
|
||||
}
|
||||
|
||||
r.head = newHead
|
||||
val := r.queue[newHead]
|
||||
|
||||
r.enqCond.Signal()
|
||||
r.locker.Unlock()
|
||||
return val, true
|
||||
}
|
||||
|
||||
func (r *random[T]) Close() {
|
||||
r.sequence.Close()
|
||||
}
|
||||
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
* Copyright 2022 The Dragonfly Authors
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package ring
|
||||
|
||||
import (
|
||||
"math"
|
||||
"sync"
|
||||
)
|
||||
|
||||
type sequence[T any] struct {
|
||||
locker sync.Locker
|
||||
enqCond *sync.Cond
|
||||
deqCond *sync.Cond
|
||||
closed bool
|
||||
|
||||
queue []*T
|
||||
head, tail uint64
|
||||
cap, mask uint64
|
||||
}
|
||||
|
||||
func NewSequence[T any](exponent int) Queue[T] {
|
||||
return newSequence[T](exponent)
|
||||
}
|
||||
|
||||
func newSequence[T any](exponent int) *sequence[T] {
|
||||
capacity := uint64(math.Exp2(float64(exponent)))
|
||||
locker := &sync.Mutex{}
|
||||
return &sequence[T]{
|
||||
locker: locker,
|
||||
enqCond: &sync.Cond{
|
||||
L: locker,
|
||||
},
|
||||
deqCond: &sync.Cond{
|
||||
L: locker,
|
||||
},
|
||||
queue: make([]*T, capacity),
|
||||
head: uint64(0),
|
||||
tail: uint64(0),
|
||||
cap: capacity,
|
||||
mask: capacity - 1,
|
||||
}
|
||||
}
|
||||
|
||||
func (seq *sequence[T]) Enqueue(value *T) {
|
||||
seq.locker.Lock()
|
||||
|
||||
entry:
|
||||
if seq.closed {
|
||||
seq.locker.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if seq.isFull() {
|
||||
seq.enqCond.Wait()
|
||||
goto entry
|
||||
}
|
||||
|
||||
newTail := (seq.tail + 1) & seq.mask
|
||||
seq.tail = newTail
|
||||
seq.queue[newTail] = value
|
||||
|
||||
seq.deqCond.Signal()
|
||||
seq.locker.Unlock()
|
||||
}
|
||||
|
||||
func (seq *sequence[T]) Dequeue() (value *T, ok bool) {
|
||||
seq.locker.Lock()
|
||||
entry:
|
||||
if seq.closed {
|
||||
seq.locker.Unlock()
|
||||
return nil, false
|
||||
}
|
||||
|
||||
if seq.isEmpty() {
|
||||
seq.deqCond.Wait()
|
||||
goto entry
|
||||
}
|
||||
newHead := (seq.head + 1) & seq.mask
|
||||
seq.head = newHead
|
||||
val := seq.queue[newHead]
|
||||
|
||||
seq.enqCond.Signal()
|
||||
seq.locker.Unlock()
|
||||
return val, true
|
||||
}
|
||||
|
||||
func (seq *sequence[T]) Close() {
|
||||
seq.locker.Lock()
|
||||
seq.closed = true
|
||||
seq.enqCond.Broadcast()
|
||||
seq.deqCond.Broadcast()
|
||||
seq.locker.Unlock()
|
||||
}
|
||||
|
||||
func (seq *sequence[T]) isEmpty() bool {
|
||||
return seq.head == seq.tail
|
||||
}
|
||||
|
||||
func (seq *sequence[T]) isFull() bool {
|
||||
return seq.tail-seq.head == seq.cap-1 || seq.head-seq.tail == 1
|
||||
}
|
||||
Loading…
Reference in New Issue