diff --git a/scheduler/resource/peer.go b/scheduler/resource/peer.go index 8d4f54068..eb214b097 100644 --- a/scheduler/resource/peer.go +++ b/scheduler/resource/peer.go @@ -26,6 +26,7 @@ import ( "time" "github.com/bits-and-blooms/bitset" + "github.com/go-http-utils/headers" "github.com/looplab/fsm" "go.uber.org/atomic" @@ -402,6 +403,7 @@ func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) { if err != nil { return []byte{}, err } + req.Header.Set(headers.Range, fmt.Sprintf("bytes=%d-%d", 0, p.Task.ContentLength.Load())) resp, err := http.DefaultClient.Do(req) if err != nil { @@ -409,5 +411,9 @@ func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) { } defer resp.Body.Close() + if resp.StatusCode != http.StatusOK { + return []byte{}, fmt.Errorf("%v: %v", url.String(), resp.Status) + } + return io.ReadAll(resp.Body) } diff --git a/scheduler/resource/peer_test.go b/scheduler/resource/peer_test.go index 73ea59d8b..77466dc9a 100644 --- a/scheduler/resource/peer_test.go +++ b/scheduler/resource/peer_test.go @@ -18,14 +18,17 @@ package resource import ( "context" + "fmt" "net" "net/http" "net/http/httptest" "net/url" + "path" "strconv" "testing" "time" + "github.com/go-http-utils/headers" "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" @@ -867,6 +870,16 @@ func TestPeer_DeleteStream(t *testing.T) { func TestPeer_DownloadTinyFile(t *testing.T) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if path.Base(r.URL.Path) == "foo" { + w.WriteHeader(http.StatusNotFound) + return + } + + if r.Header.Get(headers.Range) == "bytes=0-2" { + w.WriteHeader(http.StatusNotAcceptable) + return + } + w.WriteHeader(http.StatusOK) })) defer s.Close() @@ -886,6 +899,19 @@ func TestPeer_DownloadTinyFile(t *testing.T) { assert.NoError(err) }, }, + { + name: "download tiny file with range header", + expect: func(t *testing.T, peer *Peer) { + assert := assert.New(t) + peer.Task.ContentLength.Store(2) + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + _, err := peer.DownloadTinyFile(ctx) + assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 406 Not Acceptable", + peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID)) + }, + }, { name: "download tiny file failed because of port error", expect: func(t *testing.T, peer *Peer) { @@ -910,6 +936,19 @@ func TestPeer_DownloadTinyFile(t *testing.T) { assert.Error(err) }, }, + { + name: "download tiny file failed because of http status code", + expect: func(t *testing.T, peer *Peer) { + assert := assert.New(t) + peer.Task.ID = "foo" + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + + _, err := peer.DownloadTinyFile(ctx) + assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 404 Not Found", + peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID)) + }, + }, } for _, tc := range tests {