feat: seed peer reuses traffic (#1825)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-11-11 12:07:27 +08:00
parent f581d10208
commit 5c25a325c6
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
1 changed files with 7 additions and 6 deletions

View File

@ -196,7 +196,7 @@ func (s *seedSynchronizer) sendPieceSeeds(reuse bool) (err error) {
return status.Errorf(codes.Internal, "seed task failed: %s", reason) return status.Errorf(codes.Internal, "seed task failed: %s", reason)
case p := <-s.PieceInfoChannel: case p := <-s.PieceInfoChannel:
s.Infof("receive piece info, num: %d, ordered num: %d, finish: %v", p.Num, p.OrderedNum, p.Finished) s.Infof("receive piece info, num: %d, ordered num: %d, finish: %v", p.Num, p.OrderedNum, p.Finished)
contentLength, desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished) contentLength, desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished, reuse)
if err != nil { if err != nil {
s.Span.RecordError(err) s.Span.RecordError(err)
s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false))
@ -236,7 +236,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) er
// we must send done to scheduler // we must send done to scheduler
if len(pp.PieceInfos) == 0 { if len(pp.PieceInfos) == 0 {
ps := s.compositePieceSeed(pp, nil) ps := s.compositePieceSeed(pp, nil, reuse)
ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano())
s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000)
err = s.seedsServer.Send(&ps) err = s.seedsServer.Send(&ps)
@ -251,7 +251,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) er
s.Errorf("desired piece %d, not found", desired) s.Errorf("desired piece %d, not found", desired)
return status.Errorf(codes.Internal, "seed task piece %d not found", desired) return status.Errorf(codes.Internal, "seed task piece %d not found", desired)
} }
ps := s.compositePieceSeed(pp, p) ps := s.compositePieceSeed(pp, p, reuse)
if p.PieceNum == pp.TotalPiece-1 { if p.PieceNum == pp.TotalPiece-1 {
ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano())
s.Infof("seed tasks start time: %d, end time: %d, cost: %dms, piece number: %d", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000, p.PieceNum) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms, piece number: %d", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000, p.PieceNum)
@ -274,7 +274,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) er
} }
} }
func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool) (int64, int32, error) { func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool, reuse bool) (int64, int32, error) {
cur := desired cur := desired
var contentLength int64 = -1 var contentLength int64 = -1
for ; cur <= orderedNum; cur++ { for ; cur <= orderedNum; cur++ {
@ -302,7 +302,7 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini
s.attributeSent = true s.attributeSent = true
} }
ps := s.compositePieceSeed(pp, pp.PieceInfos[0]) ps := s.compositePieceSeed(pp, pp.PieceInfos[0], reuse)
if cur == orderedNum && finished { if cur == orderedNum && finished {
ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano())
s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000)
@ -319,8 +319,9 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini
return contentLength, cur, nil return contentLength, cur, nil
} }
func (s *seedSynchronizer) compositePieceSeed(pp *commonv1.PiecePacket, piece *commonv1.PieceInfo) cdnsystemv1.PieceSeed { func (s *seedSynchronizer) compositePieceSeed(pp *commonv1.PiecePacket, piece *commonv1.PieceInfo, reuse bool) cdnsystemv1.PieceSeed {
return cdnsystemv1.PieceSeed{ return cdnsystemv1.PieceSeed{
Reuse: reuse,
PeerId: s.seedTaskRequest.PeerId, PeerId: s.seedTaskRequest.PeerId,
HostId: s.seedTaskRequest.PeerHost.Id, HostId: s.seedTaskRequest.PeerHost.Id,
PieceInfo: piece, PieceInfo: piece,