feat: scheduler download tiny file with range header (#1024)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-01-21 16:57:45 +08:00
parent 6e636b1125
commit 061c3c8752
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
2 changed files with 45 additions and 0 deletions

View File

@ -26,6 +26,7 @@ import (
"time" "time"
"github.com/bits-and-blooms/bitset" "github.com/bits-and-blooms/bitset"
"github.com/go-http-utils/headers"
"github.com/looplab/fsm" "github.com/looplab/fsm"
"go.uber.org/atomic" "go.uber.org/atomic"
@ -402,6 +403,7 @@ func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) {
if err != nil { if err != nil {
return []byte{}, err return []byte{}, err
} }
req.Header.Set(headers.Range, fmt.Sprintf("bytes=%d-%d", 0, p.Task.ContentLength.Load()))
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
@ -409,5 +411,9 @@ func (p *Peer) DownloadTinyFile(ctx context.Context) ([]byte, error) {
} }
defer resp.Body.Close() defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return []byte{}, fmt.Errorf("%v: %v", url.String(), resp.Status)
}
return io.ReadAll(resp.Body) return io.ReadAll(resp.Body)
} }

View File

@ -18,14 +18,17 @@ package resource
import ( import (
"context" "context"
"fmt"
"net" "net"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url" "net/url"
"path"
"strconv" "strconv"
"testing" "testing"
"time" "time"
"github.com/go-http-utils/headers"
"github.com/golang/mock/gomock" "github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
@ -867,6 +870,16 @@ func TestPeer_DeleteStream(t *testing.T) {
func TestPeer_DownloadTinyFile(t *testing.T) { func TestPeer_DownloadTinyFile(t *testing.T) {
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if path.Base(r.URL.Path) == "foo" {
w.WriteHeader(http.StatusNotFound)
return
}
if r.Header.Get(headers.Range) == "bytes=0-2" {
w.WriteHeader(http.StatusNotAcceptable)
return
}
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
})) }))
defer s.Close() defer s.Close()
@ -886,6 +899,19 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
assert.NoError(err) assert.NoError(err)
}, },
}, },
{
name: "download tiny file with range header",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Task.ContentLength.Store(2)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := peer.DownloadTinyFile(ctx)
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 406 Not Acceptable",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID))
},
},
{ {
name: "download tiny file failed because of port error", name: "download tiny file failed because of port error",
expect: func(t *testing.T, peer *Peer) { expect: func(t *testing.T, peer *Peer) {
@ -910,6 +936,19 @@ func TestPeer_DownloadTinyFile(t *testing.T) {
assert.Error(err) assert.Error(err)
}, },
}, },
{
name: "download tiny file failed because of http status code",
expect: func(t *testing.T, peer *Peer) {
assert := assert.New(t)
peer.Task.ID = "foo"
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
_, err := peer.DownloadTinyFile(ctx)
assert.EqualError(err, fmt.Sprintf("http://%s:%d/download/%s/%s?peerId=scheduler: 404 Not Found",
peer.Host.IP, peer.Host.DownloadPort, peer.Task.ID[:3], peer.Task.ID))
},
},
} }
for _, tc := range tests { for _, tc := range tests {