diff --git a/client/daemon/rpcserver/subscriber.go b/client/daemon/rpcserver/subscriber.go index 4028a9dd0..a9382f7ad 100644 --- a/client/daemon/rpcserver/subscriber.go +++ b/client/daemon/rpcserver/subscriber.go @@ -115,9 +115,13 @@ func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum ui } // sendExistPieces will send as much as possible pieces -func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) { +func (s *subscriber) sendExistPieces(startNum uint32) error { s.request.StartNum = startNum - return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true) + total, err := sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true) + if total > -1 && s.isUnknownTotalPieces() { + s.totalPieces = total + } + return err } func (s *subscriber) receiveRemainingPieceTaskRequests() { @@ -161,9 +165,23 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() { } } +// totalPieces is -1, 0, n +func (s *subscriber) isKnownTotalPieces() bool { + return s.totalPieces > -1 +} + +func (s *subscriber) isUnknownTotalPieces() bool { + return !s.isKnownTotalPieces() +} + +func (s *subscriber) isAllPiecesSent(nextPieceNum uint32) bool { + return nextPieceNum == uint32(s.totalPieces) +} + func (s *subscriber) sendRemainingPieceTasks() error { // nextPieceNum is the least piece num which did not send to remote peer - // may great then total piece count, check the total piece count when use it + // available values: [0, n], n is total piece count + // when nextPieceNum is n, indicate all pieces done var nextPieceNum uint32 s.Lock() for i := int32(s.skipPieceCount); ; i++ { @@ -173,6 +191,7 @@ func (s *subscriber) sendRemainingPieceTasks() error { } } s.Unlock() + s.Debugf("desired next piece num: %d", nextPieceNum) loop: for { select { @@ -182,51 +201,54 @@ loop: case info := <-s.PieceInfoChannel: s.Infof("receive piece info, num: %d, finished: %v", info.Num, info.Finished) // not desired piece - if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum { + if uint32(info.Num) < nextPieceNum { continue } s.Lock() - total, err := s.sendExistPieces(uint32(info.Num)) + err := s.sendExistPieces(uint32(info.Num)) if err != nil { err = s.saveError(err) s.Unlock() return err } - if total > -1 && s.totalPieces == -1 { - s.totalPieces = total - } - if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) == int(s.totalPieces) { - s.Unlock() - break loop - } - if info.Finished { - s.Unlock() - break loop - } + nextPieceNum = s.searchNextPieceNum(nextPieceNum) + s.Debugf("update desired next piece num: %d", nextPieceNum) + + if info.Finished && s.isAllPiecesSent(nextPieceNum) { + s.Unlock() + break loop + } s.Unlock() case <-s.Success: s.Infof("peer task is success, send remaining pieces") s.Lock() // all pieces already sent // empty piece task will reach sendExistPieces to sync content length and piece count - if s.totalPieces > 0 && nextPieceNum == uint32(s.totalPieces) { + if s.totalPieces > 0 && s.isAllPiecesSent(nextPieceNum) { s.Unlock() break loop } - total, err := s.sendExistPieces(nextPieceNum) + + err := s.sendExistPieces(nextPieceNum) if err != nil { err = s.saveError(err) s.Unlock() return err } - if total > -1 && s.totalPieces == -1 { - s.totalPieces = total - } - if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) { + + if s.isUnknownTotalPieces() { s.Unlock() - msg := "peer task success, but can not send all pieces" + msg := "task success, but total pieces is unknown" + s.Errorf(msg) + return dferrors.Newf(commonv1.Code_ClientError, msg) + } + + nextPieceNum = s.searchNextPieceNum(nextPieceNum) + if !s.isAllPiecesSent(nextPieceNum) { + s.Unlock() + msg := "task success, but not all pieces are sent out" s.Errorf(msg) return dferrors.Newf(commonv1.Code_ClientError, msg) }