fix: client goroutine and fd leak (#713)
Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
parent
7533e23e2f
commit
6c73629016
|
|
@ -237,8 +237,8 @@ func (ptm *peerTaskManager) StartStreamPeerTask(ctx context.Context, req *schedu
|
||||||
ptm.runningPeerTasks.Store(req.PeerId, pt)
|
ptm.runningPeerTasks.Store(req.PeerId, pt)
|
||||||
|
|
||||||
// FIXME when failed due to schedulerClient error, relocate schedulerClient and retry
|
// FIXME when failed due to schedulerClient error, relocate schedulerClient and retry
|
||||||
reader, attribute, err := pt.Start(ctx)
|
readCloser, attribute, err := pt.Start(ctx)
|
||||||
return ioutil.NopCloser(reader), attribute, err
|
return readCloser, attribute, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ptm *peerTaskManager) Stop(ctx context.Context) error {
|
func (ptm *peerTaskManager) Stop(ctx context.Context) error {
|
||||||
|
|
|
||||||
|
|
@ -41,10 +41,10 @@ import (
|
||||||
// StreamPeerTask represents a peer task with stream io for reading directly without once more disk io
|
// StreamPeerTask represents a peer task with stream io for reading directly without once more disk io
|
||||||
type StreamPeerTask interface {
|
type StreamPeerTask interface {
|
||||||
Task
|
Task
|
||||||
// Start start the special peer task, return a io.Reader for stream io
|
// Start starts the special peer task, return an io.Reader for stream io
|
||||||
// when all data transferred, reader return a io.EOF
|
// when all data transferred, reader return an io.EOF
|
||||||
// attribute stands some extra data, like HTTP response Header
|
// attribute stands some extra data, like HTTP response Header
|
||||||
Start(ctx context.Context) (reader io.Reader, attribute map[string]string, err error)
|
Start(ctx context.Context) (rc io.ReadCloser, attribute map[string]string, err error)
|
||||||
}
|
}
|
||||||
|
|
||||||
type streamPeerTask struct {
|
type streamPeerTask struct {
|
||||||
|
|
@ -227,7 +227,7 @@ func (s *streamPeerTask) ReportPieceResult(result *pieceTaskResult) error {
|
||||||
return s.finish()
|
return s.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]string, error) {
|
func (s *streamPeerTask) Start(ctx context.Context) (io.ReadCloser, map[string]string, error) {
|
||||||
s.ctx, s.cancel = context.WithCancel(ctx)
|
s.ctx, s.cancel = context.WithCancel(ctx)
|
||||||
if s.needBackSource {
|
if s.needBackSource {
|
||||||
go s.backSource()
|
go s.backSource()
|
||||||
|
|
@ -272,7 +272,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
|
||||||
|
|
||||||
pr, pw := io.Pipe()
|
pr, pw := io.Pipe()
|
||||||
attr := map[string]string{}
|
attr := map[string]string{}
|
||||||
var reader io.Reader = pr
|
var readCloser io.ReadCloser = pr
|
||||||
var writer io.Writer = pw
|
var writer io.Writer = pw
|
||||||
if s.contentLength.Load() != -1 {
|
if s.contentLength.Load() != -1 {
|
||||||
attr[headers.ContentLength] = fmt.Sprintf("%d", s.contentLength.Load())
|
attr[headers.ContentLength] = fmt.Sprintf("%d", s.contentLength.Load())
|
||||||
|
|
@ -366,7 +366,7 @@ func (s *streamPeerTask) Start(ctx context.Context) (io.Reader, map[string]strin
|
||||||
}
|
}
|
||||||
}(firstPiece)
|
}(firstPiece)
|
||||||
|
|
||||||
return reader, attr, nil
|
return readCloser, attr, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *streamPeerTask) finish() error {
|
func (s *streamPeerTask) finish() error {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue