From cc5e1f7d2794e9d4c35e2ecbae4de27713102357 Mon Sep 17 00:00:00 2001 From: zzy987 Date: Fri, 29 Apr 2022 15:12:14 +0800 Subject: [PATCH] feat: use a golang native file server to replace nginx (#1258) * feat: use a golang native file server to replace nginx Signed-off-by: zzy987 --- build/images/cdn/Dockerfile | 6 +- cdn/cdn.go | 24 ++++ cdn/fileserver/fileserver.go | 111 ++++++++++++++++++ cdn/supervisor/cdn/storage/disk/disk.go | 4 + cdn/supervisor/cdn/storage/hybrid/hybrid.go | 4 + .../cdn/storage/mock/mock_storage_manager.go | 14 +++ cdn/supervisor/cdn/storage/storage.go | 3 + hack/start-cdn.sh | 2 - 8 files changed, 163 insertions(+), 5 deletions(-) create mode 100644 cdn/fileserver/fileserver.go diff --git a/build/images/cdn/Dockerfile b/build/images/cdn/Dockerfile index cf0dd6542..63c24e9dc 100644 --- a/build/images/cdn/Dockerfile +++ b/build/images/cdn/Dockerfile @@ -1,3 +1,5 @@ +ARG BASE_IMAGE=alpine:3.14 + FROM golang:1.17.3-alpine3.14 as builder ARG GOPROXY @@ -18,14 +20,12 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.4.8 && \ wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \ chmod +x /bin/grpc_health_probe -# TODO support BASE_IMAGE -FROM nginx:1.21-alpine +FROM ${BASE_IMAGE} ENV PATH=/opt/dragonfly/bin:$PATH RUN echo "hosts: files dns" > /etc/nsswitch.conf COPY --from=builder /go/src/d7y.io/dragonfly/v2/hack/start-cdn.sh /root/start.sh -COPY --from=builder /go/src/d7y.io/dragonfly/v2/hack/cdn-nginx.conf /etc/nginx/nginx.conf COPY --from=builder /opt/dragonfly/bin/cdn /opt/dragonfly/bin/cdn COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe diff --git a/cdn/cdn.go b/cdn/cdn.go index 448542208..dbe771781 100644 --- a/cdn/cdn.go +++ b/cdn/cdn.go @@ -18,6 +18,7 @@ package cdn import ( "context" + "net/http" "github.com/pkg/errors" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" @@ -25,6 +26,7 @@ import ( "google.golang.org/grpc" "d7y.io/dragonfly/v2/cdn/config" + "d7y.io/dragonfly/v2/cdn/fileserver" "d7y.io/dragonfly/v2/cdn/gc" "d7y.io/dragonfly/v2/cdn/metrics" "d7y.io/dragonfly/v2/cdn/rpcserver" @@ -33,6 +35,7 @@ import ( "d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage" "d7y.io/dragonfly/v2/cdn/supervisor/progress" "d7y.io/dragonfly/v2/cdn/supervisor/task" + "d7y.io/dragonfly/v2/client/daemon/upload" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc/manager" managerClient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" @@ -54,6 +57,9 @@ type Server struct { // gc Server gcServer *gc.Server + + // fileServer + fileServer *fileserver.Server } // New creates a brand-new server instance. @@ -97,6 +103,8 @@ func New(config *config.Config) (*Server, error) { return nil, errors.Wrap(err, "create rpcServer") } + fileServer := fileserver.New(config.RPCServer.DownloadPort, upload.PeerDownloadHTTPPathPrefix, storageManager.GetUploadPath()) + // Initialize gc server gcServer, err := gc.New() if err != nil { @@ -126,6 +134,7 @@ func New(config *config.Config) (*Server, error) { metricsServer: metricsServer, configServer: configServer, gcServer: gcServer, + fileServer: fileServer, }, nil } @@ -172,6 +181,16 @@ func (s *Server) Serve() error { } }() + go func() { + // Start file server + if err := s.fileServer.ListenAndServe(); err != nil { + if err == http.ErrServerClosed { + return + } + logger.Fatalf("start cdn file server failed: %v", err) + } + }() + // Start grpc server return s.grpcServer.ListenAndServe() } @@ -198,5 +217,10 @@ func (s *Server) Stop() error { // Stop grpc server return s.grpcServer.Shutdown() }) + + g.Go(func() error { + // Stop file server + return s.fileServer.Shutdown(ctx) + }) return g.Wait() } diff --git a/cdn/fileserver/fileserver.go b/cdn/fileserver/fileserver.go new file mode 100644 index 000000000..513c1f170 --- /dev/null +++ b/cdn/fileserver/fileserver.go @@ -0,0 +1,111 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fileserver + +import ( + "context" + "fmt" + "io/fs" + "net/http" + "os" + "path" + "path/filepath" + "strings" + "syscall" + "time" + + "github.com/pkg/errors" + + logger "d7y.io/dragonfly/v2/internal/dflog" +) + +const ( + maxRetryAttempts = 5 +) + +// d7yDir is modified from http.Dir, for http.Dir does not treat syscall.EMFILE as we want. +type d7yDir string + +func mapDirOpenError(originalErr error, name string) error { + if os.IsNotExist(originalErr) || os.IsPermission(originalErr) { + return originalErr + } + + parts := strings.Split(name, string(filepath.Separator)) + for i := range parts { + if parts[i] == "" { + continue + } + fi, err := os.Stat(strings.Join(parts[:i+1], string(filepath.Separator))) + if err != nil { + return originalErr + } + if !fi.IsDir() { + pathError, ok := originalErr.(*fs.PathError) + if ok && pathError.Err == syscall.EMFILE { + return syscall.EMFILE + } + return fs.ErrNotExist + } + } + return originalErr +} + +func (d d7yDir) Open(name string) (http.File, error) { + if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) { + return nil, errors.New("http: invalid character in file path") + } + dir := string(d) + if dir == "" { + dir = "." + } + fullName := filepath.Join(dir, filepath.FromSlash(path.Clean("/"+name))) + + f, err := os.Open(fullName) + if err != nil { + mappedErr := mapDirOpenError(err, fullName) + // retry when syscall.EMFILE + for i := 0; mappedErr == syscall.EMFILE && i < maxRetryAttempts; i++ { + time.Sleep(100 * time.Millisecond) + f, err = os.Open(fullName) + if err == nil { + return f, nil + } + mappedErr = mapDirOpenError(err, fullName) + } + return nil, mappedErr + } + return f, nil +} + +type Server struct { + *http.Server +} + +func New(port int, prefix, uploadPath string) *Server { + return &Server{ + &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: http.StripPrefix(prefix, http.FileServer(d7yDir(uploadPath))), + }, + } +} + +func (server *Server) Shutdown(ctx context.Context) error { + defer logger.Infof("====stopped file server====") + return server.Server.Shutdown(ctx) +} diff --git a/cdn/supervisor/cdn/storage/disk/disk.go b/cdn/supervisor/cdn/storage/disk/disk.go index 12565dc26..2284d7969 100644 --- a/cdn/supervisor/cdn/storage/disk/disk.go +++ b/cdn/supervisor/cdn/storage/disk/disk.go @@ -281,6 +281,10 @@ func (s *diskStorageManager) TryFreeSpace(fileLength int64) (bool, error) { return true, nil } +func (s *diskStorageManager) GetUploadPath() string { + return s.diskDriver.GetPath(storage.GetUploadHomeRaw()) +} + func (s *diskStorageManager) GC() error { logger.StorageGCLogger.With("type", "disk").Debug("start the disk storage gc job") gcTaskIDs, err := s.diskCleaner.GC("disk", false) diff --git a/cdn/supervisor/cdn/storage/hybrid/hybrid.go b/cdn/supervisor/cdn/storage/hybrid/hybrid.go index 2e6cf0eb5..f3d7957ae 100644 --- a/cdn/supervisor/cdn/storage/hybrid/hybrid.go +++ b/cdn/supervisor/cdn/storage/hybrid/hybrid.go @@ -381,6 +381,10 @@ func (h *hybridStorageManager) tryShmSpace(url, taskID string, fileLength int64) return "", fmt.Errorf("shared memory is not allowed") } +func (h *hybridStorageManager) GetUploadPath() string { + return h.diskDriver.GetPath(storage.GetUploadHomeRaw()) +} + func (h *hybridStorageManager) GC() error { logger.StorageGCLogger.With("type", "hybrid").Debug("start the hybrid storage gc job") var wg sync.WaitGroup diff --git a/cdn/supervisor/cdn/storage/mock/mock_storage_manager.go b/cdn/supervisor/cdn/storage/mock/mock_storage_manager.go index c56d659b5..442cc1b34 100644 --- a/cdn/supervisor/cdn/storage/mock/mock_storage_manager.go +++ b/cdn/supervisor/cdn/storage/mock/mock_storage_manager.go @@ -65,6 +65,20 @@ func (mr *MockManagerMockRecorder) DeleteTask(arg0 interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockManager)(nil).DeleteTask), arg0) } +// GetUploadPath mocks base method. +func (m *MockManager) GetUploadPath() string { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetUploadPath") + ret0, _ := ret[0].(string) + return ret0 +} + +// GetUploadPath indicates an expected call of GetUploadPath. +func (mr *MockManagerMockRecorder) GetUploadPath() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadPath", reflect.TypeOf((*MockManager)(nil).GetUploadPath)) +} + // ReadDownloadFile mocks base method. func (m *MockManager) ReadDownloadFile(arg0 string) (io.ReadCloser, error) { m.ctrl.T.Helper() diff --git a/cdn/supervisor/cdn/storage/storage.go b/cdn/supervisor/cdn/storage/storage.go index 664326a03..6a30efe13 100644 --- a/cdn/supervisor/cdn/storage/storage.go +++ b/cdn/supervisor/cdn/storage/storage.go @@ -74,6 +74,9 @@ type Manager interface { // TryFreeSpace checks if there is enough space for the file, return true while we are sure that there is enough space. TryFreeSpace(fileLength int64) (bool, error) + + // GetUploadPath get path of upload file + GetUploadPath() string } // FileMetadata meta data of task diff --git a/hack/start-cdn.sh b/hack/start-cdn.sh index ff4c5893b..12a236073 100755 --- a/hack/start-cdn.sh +++ b/hack/start-cdn.sh @@ -4,6 +4,4 @@ set -o nounset set -o errexit set -o pipefail -nginx - /opt/dragonfly/bin/cdn "$@"