feat: replace gin-gonic/gin with gorilla/mux (#1389)

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-06-15 17:39:16 +08:00
parent 2441ae85c9
commit 4889a37886
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
8 changed files with 136 additions and 93 deletions

View File

@ -35,7 +35,6 @@ import (
"d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage"
"d7y.io/dragonfly/v2/cdn/supervisor/progress"
"d7y.io/dragonfly/v2/cdn/supervisor/task"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/manager/model"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
@ -104,7 +103,7 @@ func New(config *config.Config) (*Server, error) {
return nil, errors.Wrap(err, "create rpcServer")
}
fileServer := fileserver.New(config.RPCServer.DownloadPort, upload.PeerDownloadHTTPPathPrefix, storageManager.GetUploadPath())
fileServer := fileserver.New(config.RPCServer.DownloadPort, "/download/", storageManager.GetUploadPath())
// Initialize gc server
gcServer, err := gc.New()

View File

@ -29,7 +29,7 @@ import (
"sync"
"time"
"github.com/gorilla/mux"
"github.com/gin-gonic/gin"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"golang.org/x/sync/errgroup"
@ -549,17 +549,19 @@ func (cd *clientDaemon) Serve() error {
if cd.Option.Health.ListenOption.TCPListen == nil {
logger.Fatalf("health listen not found")
}
logger.Infof("serve http health at %#v", cd.Option.Health.ListenOption.TCPListen)
r := mux.NewRouter()
r.Path(cd.Option.Health.Path).HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
_, _ = writer.Write([]byte("success"))
r := gin.Default()
r.GET(cd.Option.Health.Path, func(c *gin.Context) {
c.JSON(http.StatusOK, http.StatusText(http.StatusOK))
})
listener, _, err := cd.prepareTCPListener(cd.Option.Health.ListenOption, false)
if err != nil {
logger.Fatalf("init health http server error: %v", err)
}
go func() {
logger.Infof("serve http health at %#v", cd.Option.Health.ListenOption.TCPListen)
if err = http.Serve(listener, r); err != nil {
if err == http.ErrServerClosed {
return

View File

@ -22,11 +22,10 @@ import (
"io"
"net"
"net/http"
"strings"
"net/url"
"time"
"d7y.io/dragonfly/v2/client/daemon/storage"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/source"
@ -190,20 +189,16 @@ func (p *pieceDownloader) DownloadPiece(ctx context.Context, req *DownloadPieceR
}
func buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest) *http.Request {
b := strings.Builder{}
// FIXME switch to https when tls enabled
b.WriteString("http://")
b.WriteString(d.DstAddr)
b.WriteString(upload.PeerDownloadHTTPPathPrefix)
b.Write([]byte(d.TaskID)[:3])
b.Write([]byte("/"))
b.WriteString(d.TaskID)
b.Write([]byte("?peerId="))
b.WriteString(d.DstPid)
targetURL := url.URL{
Scheme: "http",
Host: d.DstAddr,
Path: fmt.Sprintf("download/%s/%s", d.TaskID[:3], d.TaskID),
RawQuery: fmt.Sprintf("peerId=%s", d.DstPid),
}
u := b.String()
logger.Debugf("built request url: %s", u)
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil)
logger.Debugf("built request url: %s", targetURL.String())
req, _ := http.NewRequestWithContext(ctx, http.MethodGet, targetURL.String(), nil)
// TODO use string.Builder
req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d",

View File

@ -36,7 +36,6 @@ import (
"d7y.io/dragonfly/v2/client/clientutil"
"d7y.io/dragonfly/v2/client/daemon/test"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/base"
"d7y.io/dragonfly/v2/pkg/source"
@ -62,7 +61,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
}{
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-0", r.URL.Path)
assert.Equal("/download/tas/task-0", r.URL.Path)
data := []byte("test test ")
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", len(data)))
if _, err := w.Write(data); err != nil {
@ -77,7 +76,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
},
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-1", r.URL.Path)
assert.Equal("/download/tas/task-1", r.URL.Path)
rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64)
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length))
if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil {
@ -92,7 +91,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
},
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-2", r.URL.Path)
assert.Equal("/download/tas/task-2", r.URL.Path)
rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64)
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length))
if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil {
@ -107,7 +106,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) {
},
{
handleFunc: func(w http.ResponseWriter, r *http.Request) {
assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-3", r.URL.Path)
assert.Equal("/download/tas/task-3", r.URL.Path)
rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64)
w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length))
if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil {

View File

@ -0,0 +1,26 @@
/*
* Copyright 2022 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package upload
type DownloadParams struct {
TaskPrefix string `uri:"task_prefix" binding:"required"`
TaskID string `uri:"task_id" binding:"required"`
}
type DownalodQuery struct {
PeerID string `form:"peerId" binding:"required"`
}

View File

@ -24,8 +24,8 @@ import (
"net"
"net/http"
"github.com/gin-gonic/gin"
"github.com/go-http-utils/headers"
"github.com/gorilla/mux"
"golang.org/x/time/rate"
"d7y.io/dragonfly/v2/client/clientutil"
@ -33,96 +33,117 @@ import (
logger "d7y.io/dragonfly/v2/internal/dflog"
)
var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation
// Manager is the interface used for upload task.
type Manager interface {
// Started upload manager server.
Serve(lis net.Listener) error
// Stop upload manager server.
Stop() error
}
// uploadManager provides upload manager function.
type uploadManager struct {
*http.Server
*rate.Limiter
StorageManager storage.Manager
storageManager storage.Manager
}
var _ Manager = (*uploadManager)(nil)
// Option is a functional option for configuring the upload manager.
type Option func(um *uploadManager)
const (
PeerDownloadHTTPPathPrefix = "/download/"
)
func NewUploadManager(s storage.Manager, opts ...func(*uploadManager)) (Manager, error) {
u := &uploadManager{
Server: &http.Server{},
StorageManager: s,
}
u.initRouter()
for _, opt := range opts {
opt(u)
}
return u, nil
}
// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size
// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size.
func WithLimiter(limiter *rate.Limiter) func(*uploadManager) {
return func(manager *uploadManager) {
manager.Limiter = limiter
}
}
func (um *uploadManager) initRouter() {
r := mux.NewRouter()
// Health Check
r.HandleFunc("/healthy", um.handleHealth).Methods("GET")
// Peer download task
r.HandleFunc(PeerDownloadHTTPPathPrefix+"{taskPrefix:.*}/"+"{task:.*}", um.handleUpload).Queries("peerId", "{.*}").Methods("GET")
um.Server.Handler = r
// New returns a new Manager instence.
func NewUploadManager(storageManager storage.Manager, opts ...Option) (Manager, error) {
um := &uploadManager{
storageManager: storageManager,
}
router := um.initRouter()
um.Server = &http.Server{
Handler: router,
}
for _, opt := range opts {
opt(um)
}
return um, nil
}
// Started upload manager server.
func (um *uploadManager) Serve(lis net.Listener) error {
return um.Server.Serve(lis)
}
// Stop upload manager server.
func (um *uploadManager) Stop() error {
return um.Server.Shutdown(context.Background())
}
// handleHealth uses to check server health.
func (um *uploadManager) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
// Initialize router of gin.
func (um *uploadManager) initRouter() *gin.Engine {
r := gin.Default()
// Health Check.
r.GET("/healthy", um.getHealth)
// Peer download task.
r.GET("/download/:task_prefix/:task_id", um.getDownload)
return r
}
// handleUpload uses to upload a task file when other peers download from it.
func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) {
var (
task = mux.Vars(r)["task"]
peer = r.FormValue("peerId")
//cdnSource = r.Header.Get("X-Dragonfly-CDN-Source")
)
// getHealth uses to check server health.
func (um *uploadManager) getHealth(ctx *gin.Context) {
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
}
sLogger := logger.With("peer", peer, "task", task, "component", "uploadManager")
sLogger.Debugf("upload piece for task %s/%s to %s, request header: %#v", task, peer, r.RemoteAddr, r.Header)
rg, err := clientutil.ParseRange(r.Header.Get(headers.Range), math.MaxInt64)
// getDownload uses to upload a task file when other peers download from it.
func (um *uploadManager) getDownload(ctx *gin.Context) {
var params DownloadParams
if err := ctx.ShouldBindUri(&params); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
var query DownalodQuery
if err := ctx.ShouldBindQuery(&query); err != nil {
ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()})
return
}
taskID := params.TaskID
peerID := query.PeerID
log := logger.WithTaskAndPeerID(taskID, peerID).With("component", "uploadManager")
log.Debugf("upload piece for task %s/%s to %s, request header: %#v", taskID, peerID, ctx.Request.RemoteAddr, ctx.Request.Header)
rg, err := clientutil.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64)
if err != nil {
sLogger.Errorf("parse range with error: %s", err)
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if len(rg) != 1 {
sLogger.Error("multi range parsed, not support")
http.Error(w, "invalid range", http.StatusBadRequest)
log.Errorf("parse range with error: %s", err)
ctx.JSON(http.StatusBadRequest, gin.H{"errors": err.Error()})
return
}
// add header "Content-Length" to avoid chunked body in http client
w.Header().Add(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length))
reader, closer, err := um.StorageManager.ReadPiece(r.Context(),
if len(rg) != 1 {
log.Error("multi range parsed, not support")
ctx.JSON(http.StatusBadRequest, gin.H{"errors": "invalid range"})
return
}
// Add header "Content-Length" to avoid chunked body in http client.
ctx.Header(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length))
reader, closer, err := um.storageManager.ReadPiece(ctx,
&storage.ReadPieceRequest{
PeerTaskMetadata: storage.PeerTaskMetadata{
TaskID: task,
PeerID: peer,
TaskID: taskID,
PeerID: peerID,
},
PieceMetadata: storage.PieceMetadata{
Num: -1,
@ -130,26 +151,27 @@ func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) {
},
})
if err != nil {
sLogger.Errorf("get task data failed: %s", err)
http.Error(w, fmt.Sprintf("get piece data error: %s", err), http.StatusInternalServerError)
log.Errorf("get task data failed: %s", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
defer closer.Close()
if um.Limiter != nil {
if err = um.Limiter.WaitN(r.Context(), int(rg[0].Length)); err != nil {
sLogger.Errorf("get limit failed: %s", err)
http.Error(w, fmt.Sprintf("get limit error: %s", err), http.StatusInternalServerError)
if err = um.Limiter.WaitN(ctx, int(rg[0].Length)); err != nil {
log.Errorf("get limit failed: %s", err)
ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()})
return
}
}
// if w is a socket, golang will use sendfile or splice syscall for zero copy feature
// when start to transfer data, we could not call http.Error with header
if n, err := io.Copy(w, reader); err != nil {
sLogger.Errorf("transfer data failed: %s", err)
// If w is a socket, golang will use sendfile or splice syscall for zero copy feature
// when start to transfer data, we could not call http.Error with header.
if n, err := io.Copy(ctx.Writer, reader); err != nil {
log.Errorf("transfer data failed: %s", err)
return
} else if n != rg[0].Length {
sLogger.Errorf("transferred data length not match request, request: %d, transferred: %d",
log.Errorf("transferred data length not match request, request: %d, transferred: %d",
rg[0].Length, n)
return
}

View File

@ -96,7 +96,7 @@ func TestUploadManager_Serve(t *testing.T) {
for _, tt := range tests {
req, _ := http.NewRequest(http.MethodGet,
fmt.Sprintf("http://%s%s%s/%s?peerId=%s", addr, PeerDownloadHTTPPathPrefix, "666", tt.taskID, tt.peerID), nil)
fmt.Sprintf("http://%s/%s/%s/%s?peerId=%s", addr, "download", "666", tt.taskID, tt.peerID), nil)
req.Header.Add("Range", tt.pieceRange)
resp, err := http.DefaultClient.Do(req)

View File

@ -33,5 +33,5 @@ import (
// @Failure 500
// @Router /healthy [get]
func (h *Handlers) GetHealth(ctx *gin.Context) {
ctx.JSON(http.StatusOK, "OK")
ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK))
}