From 5c25a325c6e0e06275144c74e2b9dae148ce36cc Mon Sep 17 00:00:00 2001 From: Gaius Date: Fri, 11 Nov 2022 12:07:27 +0800 Subject: [PATCH] feat: seed peer reuses traffic (#1825) Signed-off-by: Gaius --- client/daemon/rpcserver/seeder.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/client/daemon/rpcserver/seeder.go b/client/daemon/rpcserver/seeder.go index a32350edb..4a33de1bf 100644 --- a/client/daemon/rpcserver/seeder.go +++ b/client/daemon/rpcserver/seeder.go @@ -196,7 +196,7 @@ func (s *seedSynchronizer) sendPieceSeeds(reuse bool) (err error) { return status.Errorf(codes.Internal, "seed task failed: %s", reason) case p := <-s.PieceInfoChannel: s.Infof("receive piece info, num: %d, ordered num: %d, finish: %v", p.Num, p.OrderedNum, p.Finished) - contentLength, desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished) + contentLength, desired, err = s.sendOrderedPieceSeeds(desired, p.OrderedNum, p.Finished, reuse) if err != nil { s.Span.RecordError(err) s.Span.SetAttributes(config.AttributeSeedTaskSuccess.Bool(false)) @@ -236,7 +236,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) er // we must send done to scheduler if len(pp.PieceInfos) == 0 { - ps := s.compositePieceSeed(pp, nil) + ps := s.compositePieceSeed(pp, nil, reuse) ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) err = s.seedsServer.Send(&ps) @@ -251,7 +251,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) er s.Errorf("desired piece %d, not found", desired) return status.Errorf(codes.Internal, "seed task piece %d not found", desired) } - ps := s.compositePieceSeed(pp, p) + ps := s.compositePieceSeed(pp, p, reuse) if p.PieceNum == pp.TotalPiece-1 { ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms, piece number: %d", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000, p.PieceNum) @@ -274,7 +274,7 @@ func (s *seedSynchronizer) sendRemindingPieceSeeds(desired int32, reuse bool) er } } -func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool) (int64, int32, error) { +func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, finished bool, reuse bool) (int64, int32, error) { cur := desired var contentLength int64 = -1 for ; cur <= orderedNum; cur++ { @@ -302,7 +302,7 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini s.attributeSent = true } - ps := s.compositePieceSeed(pp, pp.PieceInfos[0]) + ps := s.compositePieceSeed(pp, pp.PieceInfos[0], reuse) if cur == orderedNum && finished { ps.Done, ps.EndTime = true, uint64(time.Now().UnixNano()) s.Infof("seed tasks start time: %d, end time: %d, cost: %dms", ps.BeginTime, ps.EndTime, (ps.EndTime-ps.BeginTime)/1000000) @@ -319,8 +319,9 @@ func (s *seedSynchronizer) sendOrderedPieceSeeds(desired, orderedNum int32, fini return contentLength, cur, nil } -func (s *seedSynchronizer) compositePieceSeed(pp *commonv1.PiecePacket, piece *commonv1.PieceInfo) cdnsystemv1.PieceSeed { +func (s *seedSynchronizer) compositePieceSeed(pp *commonv1.PiecePacket, piece *commonv1.PieceInfo, reuse bool) cdnsystemv1.PieceSeed { return cdnsystemv1.PieceSeed{ + Reuse: reuse, PeerId: s.seedTaskRequest.PeerId, HostId: s.seedTaskRequest.PeerHost.Id, PieceInfo: piece,