From 4889a37886b012afa2429cd66698c81f234c9ef5 Mon Sep 17 00:00:00 2001 From: Gaius Date: Wed, 15 Jun 2022 17:39:16 +0800 Subject: [PATCH] feat: replace gin-gonic/gin with gorilla/mux (#1389) Signed-off-by: Gaius --- cdn/cdn.go | 3 +- client/daemon/daemon.go | 14 +- client/daemon/peer/piece_downloader.go | 23 ++- client/daemon/peer/piece_downloader_test.go | 9 +- client/daemon/upload/types.go | 26 ++++ client/daemon/upload/upload_manager.go | 150 +++++++++++--------- client/daemon/upload/upload_manager_test.go | 2 +- manager/handlers/health.go | 2 +- 8 files changed, 136 insertions(+), 93 deletions(-) create mode 100644 client/daemon/upload/types.go diff --git a/cdn/cdn.go b/cdn/cdn.go index 175d04e4f..93342864b 100644 --- a/cdn/cdn.go +++ b/cdn/cdn.go @@ -35,7 +35,6 @@ import ( "d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage" "d7y.io/dragonfly/v2/cdn/supervisor/progress" "d7y.io/dragonfly/v2/cdn/supervisor/task" - "d7y.io/dragonfly/v2/client/daemon/upload" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/pkg/rpc/manager" @@ -104,7 +103,7 @@ func New(config *config.Config) (*Server, error) { return nil, errors.Wrap(err, "create rpcServer") } - fileServer := fileserver.New(config.RPCServer.DownloadPort, upload.PeerDownloadHTTPPathPrefix, storageManager.GetUploadPath()) + fileServer := fileserver.New(config.RPCServer.DownloadPort, "/download/", storageManager.GetUploadPath()) // Initialize gc server gcServer, err := gc.New() diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index ac72d6106..8159194eb 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -29,7 +29,7 @@ import ( "sync" "time" - "github.com/gorilla/mux" + "github.com/gin-gonic/gin" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/sync/errgroup" @@ -549,17 +549,19 @@ func (cd *clientDaemon) Serve() error { if cd.Option.Health.ListenOption.TCPListen == nil { logger.Fatalf("health listen not found") } - logger.Infof("serve http health at %#v", cd.Option.Health.ListenOption.TCPListen) - r := mux.NewRouter() - r.Path(cd.Option.Health.Path).HandlerFunc(func(writer http.ResponseWriter, request *http.Request) { - writer.WriteHeader(http.StatusOK) - _, _ = writer.Write([]byte("success")) + + r := gin.Default() + r.GET(cd.Option.Health.Path, func(c *gin.Context) { + c.JSON(http.StatusOK, http.StatusText(http.StatusOK)) }) + listener, _, err := cd.prepareTCPListener(cd.Option.Health.ListenOption, false) if err != nil { logger.Fatalf("init health http server error: %v", err) } + go func() { + logger.Infof("serve http health at %#v", cd.Option.Health.ListenOption.TCPListen) if err = http.Serve(listener, r); err != nil { if err == http.ErrServerClosed { return diff --git a/client/daemon/peer/piece_downloader.go b/client/daemon/peer/piece_downloader.go index 1d0c3d830..813264a50 100644 --- a/client/daemon/peer/piece_downloader.go +++ b/client/daemon/peer/piece_downloader.go @@ -22,11 +22,10 @@ import ( "io" "net" "net/http" - "strings" + "net/url" "time" "d7y.io/dragonfly/v2/client/daemon/storage" - "d7y.io/dragonfly/v2/client/daemon/upload" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/source" @@ -190,20 +189,16 @@ func (p *pieceDownloader) DownloadPiece(ctx context.Context, req *DownloadPieceR } func buildDownloadPieceHTTPRequest(ctx context.Context, d *DownloadPieceRequest) *http.Request { - b := strings.Builder{} // FIXME switch to https when tls enabled - b.WriteString("http://") - b.WriteString(d.DstAddr) - b.WriteString(upload.PeerDownloadHTTPPathPrefix) - b.Write([]byte(d.TaskID)[:3]) - b.Write([]byte("/")) - b.WriteString(d.TaskID) - b.Write([]byte("?peerId=")) - b.WriteString(d.DstPid) + targetURL := url.URL{ + Scheme: "http", + Host: d.DstAddr, + Path: fmt.Sprintf("download/%s/%s", d.TaskID[:3], d.TaskID), + RawQuery: fmt.Sprintf("peerId=%s", d.DstPid), + } - u := b.String() - logger.Debugf("built request url: %s", u) - req, _ := http.NewRequestWithContext(ctx, http.MethodGet, u, nil) + logger.Debugf("built request url: %s", targetURL.String()) + req, _ := http.NewRequestWithContext(ctx, http.MethodGet, targetURL.String(), nil) // TODO use string.Builder req.Header.Add("Range", fmt.Sprintf("bytes=%d-%d", diff --git a/client/daemon/peer/piece_downloader_test.go b/client/daemon/peer/piece_downloader_test.go index 63917dadf..e34196899 100644 --- a/client/daemon/peer/piece_downloader_test.go +++ b/client/daemon/peer/piece_downloader_test.go @@ -36,7 +36,6 @@ import ( "d7y.io/dragonfly/v2/client/clientutil" "d7y.io/dragonfly/v2/client/daemon/test" - "d7y.io/dragonfly/v2/client/daemon/upload" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/base" "d7y.io/dragonfly/v2/pkg/source" @@ -62,7 +61,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { }{ { handleFunc: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-0", r.URL.Path) + assert.Equal("/download/tas/task-0", r.URL.Path) data := []byte("test test ") w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", len(data))) if _, err := w.Write(data); err != nil { @@ -77,7 +76,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { }, { handleFunc: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-1", r.URL.Path) + assert.Equal("/download/tas/task-1", r.URL.Path) rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64) w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length)) if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil { @@ -92,7 +91,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { }, { handleFunc: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-2", r.URL.Path) + assert.Equal("/download/tas/task-2", r.URL.Path) rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64) w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length)) if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil { @@ -107,7 +106,7 @@ func TestPieceDownloader_DownloadPiece(t *testing.T) { }, { handleFunc: func(w http.ResponseWriter, r *http.Request) { - assert.Equal(upload.PeerDownloadHTTPPathPrefix+"tas/"+"task-3", r.URL.Path) + assert.Equal("/download/tas/task-3", r.URL.Path) rg := clientutil.MustParseRange(r.Header.Get("Range"), math.MaxInt64) w.Header().Set(headers.ContentLength, fmt.Sprintf("%d", rg.Length)) if _, err := w.Write(testData[rg.Start : rg.Start+rg.Length]); err != nil { diff --git a/client/daemon/upload/types.go b/client/daemon/upload/types.go new file mode 100644 index 000000000..e9b62f6e1 --- /dev/null +++ b/client/daemon/upload/types.go @@ -0,0 +1,26 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package upload + +type DownloadParams struct { + TaskPrefix string `uri:"task_prefix" binding:"required"` + TaskID string `uri:"task_id" binding:"required"` +} + +type DownalodQuery struct { + PeerID string `form:"peerId" binding:"required"` +} diff --git a/client/daemon/upload/upload_manager.go b/client/daemon/upload/upload_manager.go index 57b8d6f58..570096f45 100644 --- a/client/daemon/upload/upload_manager.go +++ b/client/daemon/upload/upload_manager.go @@ -24,8 +24,8 @@ import ( "net" "net/http" + "github.com/gin-gonic/gin" "github.com/go-http-utils/headers" - "github.com/gorilla/mux" "golang.org/x/time/rate" "d7y.io/dragonfly/v2/client/clientutil" @@ -33,96 +33,117 @@ import ( logger "d7y.io/dragonfly/v2/internal/dflog" ) -var _ *logger.SugaredLoggerOnWith // pin this package for no log code generation - +// Manager is the interface used for upload task. type Manager interface { + // Started upload manager server. Serve(lis net.Listener) error + + // Stop upload manager server. Stop() error } +// uploadManager provides upload manager function. type uploadManager struct { *http.Server *rate.Limiter - StorageManager storage.Manager + storageManager storage.Manager } -var _ Manager = (*uploadManager)(nil) +// Option is a functional option for configuring the upload manager. +type Option func(um *uploadManager) -const ( - PeerDownloadHTTPPathPrefix = "/download/" -) - -func NewUploadManager(s storage.Manager, opts ...func(*uploadManager)) (Manager, error) { - u := &uploadManager{ - Server: &http.Server{}, - StorageManager: s, - } - u.initRouter() - for _, opt := range opts { - opt(u) - } - return u, nil -} - -// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size +// WithLimiter sets upload rate limiter, the burst size must be bigger than piece size. func WithLimiter(limiter *rate.Limiter) func(*uploadManager) { return func(manager *uploadManager) { manager.Limiter = limiter } } -func (um *uploadManager) initRouter() { - r := mux.NewRouter() - // Health Check - r.HandleFunc("/healthy", um.handleHealth).Methods("GET") +// New returns a new Manager instence. +func NewUploadManager(storageManager storage.Manager, opts ...Option) (Manager, error) { + um := &uploadManager{ + storageManager: storageManager, + } - // Peer download task - r.HandleFunc(PeerDownloadHTTPPathPrefix+"{taskPrefix:.*}/"+"{task:.*}", um.handleUpload).Queries("peerId", "{.*}").Methods("GET") - um.Server.Handler = r + router := um.initRouter() + um.Server = &http.Server{ + Handler: router, + } + + for _, opt := range opts { + opt(um) + } + + return um, nil } +// Started upload manager server. func (um *uploadManager) Serve(lis net.Listener) error { return um.Server.Serve(lis) } +// Stop upload manager server. func (um *uploadManager) Stop() error { return um.Server.Shutdown(context.Background()) } -// handleHealth uses to check server health. -func (um *uploadManager) handleHealth(w http.ResponseWriter, r *http.Request) { - w.WriteHeader(http.StatusOK) +// Initialize router of gin. +func (um *uploadManager) initRouter() *gin.Engine { + r := gin.Default() + + // Health Check. + r.GET("/healthy", um.getHealth) + + // Peer download task. + r.GET("/download/:task_prefix/:task_id", um.getDownload) + + return r } -// handleUpload uses to upload a task file when other peers download from it. -func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) { - var ( - task = mux.Vars(r)["task"] - peer = r.FormValue("peerId") - //cdnSource = r.Header.Get("X-Dragonfly-CDN-Source") - ) +// getHealth uses to check server health. +func (um *uploadManager) getHealth(ctx *gin.Context) { + ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK)) +} - sLogger := logger.With("peer", peer, "task", task, "component", "uploadManager") - sLogger.Debugf("upload piece for task %s/%s to %s, request header: %#v", task, peer, r.RemoteAddr, r.Header) - rg, err := clientutil.ParseRange(r.Header.Get(headers.Range), math.MaxInt64) +// getDownload uses to upload a task file when other peers download from it. +func (um *uploadManager) getDownload(ctx *gin.Context) { + var params DownloadParams + if err := ctx.ShouldBindUri(¶ms); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + var query DownalodQuery + if err := ctx.ShouldBindQuery(&query); err != nil { + ctx.JSON(http.StatusUnprocessableEntity, gin.H{"errors": err.Error()}) + return + } + + taskID := params.TaskID + peerID := query.PeerID + + log := logger.WithTaskAndPeerID(taskID, peerID).With("component", "uploadManager") + log.Debugf("upload piece for task %s/%s to %s, request header: %#v", taskID, peerID, ctx.Request.RemoteAddr, ctx.Request.Header) + rg, err := clientutil.ParseRange(ctx.GetHeader(headers.Range), math.MaxInt64) if err != nil { - sLogger.Errorf("parse range with error: %s", err) - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - if len(rg) != 1 { - sLogger.Error("multi range parsed, not support") - http.Error(w, "invalid range", http.StatusBadRequest) + log.Errorf("parse range with error: %s", err) + ctx.JSON(http.StatusBadRequest, gin.H{"errors": err.Error()}) return } - // add header "Content-Length" to avoid chunked body in http client - w.Header().Add(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length)) - reader, closer, err := um.StorageManager.ReadPiece(r.Context(), + if len(rg) != 1 { + log.Error("multi range parsed, not support") + ctx.JSON(http.StatusBadRequest, gin.H{"errors": "invalid range"}) + return + } + + // Add header "Content-Length" to avoid chunked body in http client. + ctx.Header(headers.ContentLength, fmt.Sprintf("%d", rg[0].Length)) + reader, closer, err := um.storageManager.ReadPiece(ctx, &storage.ReadPieceRequest{ PeerTaskMetadata: storage.PeerTaskMetadata{ - TaskID: task, - PeerID: peer, + TaskID: taskID, + PeerID: peerID, }, PieceMetadata: storage.PieceMetadata{ Num: -1, @@ -130,26 +151,27 @@ func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) { }, }) if err != nil { - sLogger.Errorf("get task data failed: %s", err) - http.Error(w, fmt.Sprintf("get piece data error: %s", err), http.StatusInternalServerError) + log.Errorf("get task data failed: %s", err) + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } defer closer.Close() + if um.Limiter != nil { - if err = um.Limiter.WaitN(r.Context(), int(rg[0].Length)); err != nil { - sLogger.Errorf("get limit failed: %s", err) - http.Error(w, fmt.Sprintf("get limit error: %s", err), http.StatusInternalServerError) + if err = um.Limiter.WaitN(ctx, int(rg[0].Length)); err != nil { + log.Errorf("get limit failed: %s", err) + ctx.JSON(http.StatusInternalServerError, gin.H{"errors": err.Error()}) return } } - // if w is a socket, golang will use sendfile or splice syscall for zero copy feature - // when start to transfer data, we could not call http.Error with header - if n, err := io.Copy(w, reader); err != nil { - sLogger.Errorf("transfer data failed: %s", err) + // If w is a socket, golang will use sendfile or splice syscall for zero copy feature + // when start to transfer data, we could not call http.Error with header. + if n, err := io.Copy(ctx.Writer, reader); err != nil { + log.Errorf("transfer data failed: %s", err) return } else if n != rg[0].Length { - sLogger.Errorf("transferred data length not match request, request: %d, transferred: %d", + log.Errorf("transferred data length not match request, request: %d, transferred: %d", rg[0].Length, n) return } diff --git a/client/daemon/upload/upload_manager_test.go b/client/daemon/upload/upload_manager_test.go index 63d6f1883..834ea77c8 100644 --- a/client/daemon/upload/upload_manager_test.go +++ b/client/daemon/upload/upload_manager_test.go @@ -96,7 +96,7 @@ func TestUploadManager_Serve(t *testing.T) { for _, tt := range tests { req, _ := http.NewRequest(http.MethodGet, - fmt.Sprintf("http://%s%s%s/%s?peerId=%s", addr, PeerDownloadHTTPPathPrefix, "666", tt.taskID, tt.peerID), nil) + fmt.Sprintf("http://%s/%s/%s/%s?peerId=%s", addr, "download", "666", tt.taskID, tt.peerID), nil) req.Header.Add("Range", tt.pieceRange) resp, err := http.DefaultClient.Do(req) diff --git a/manager/handlers/health.go b/manager/handlers/health.go index e3b8edb15..eb1627405 100644 --- a/manager/handlers/health.go +++ b/manager/handlers/health.go @@ -33,5 +33,5 @@ import ( // @Failure 500 // @Router /healthy [get] func (h *Handlers) GetHealth(ctx *gin.Context) { - ctx.JSON(http.StatusOK, "OK") + ctx.JSON(http.StatusOK, http.StatusText(http.StatusOK)) }