feat: use a golang native file server to replace nginx (#1258)

* feat: use a golang native file server to replace nginx

Signed-off-by: zzy987 <nevermind@sjtu.edu.cn>
This commit is contained in:
zzy987 2022-04-29 15:12:14 +08:00 committed by Gaius
parent b39a0a0cdc
commit cc5e1f7d27
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
8 changed files with 163 additions and 5 deletions

View File

@ -1,3 +1,5 @@
ARG BASE_IMAGE=alpine:3.14
FROM golang:1.17.3-alpine3.14 as builder
ARG GOPROXY
@ -18,14 +20,12 @@ RUN GRPC_HEALTH_PROBE_VERSION=v0.4.8 && \
wget -qO/bin/grpc_health_probe https://github.com/grpc-ecosystem/grpc-health-probe/releases/download/${GRPC_HEALTH_PROBE_VERSION}/grpc_health_probe-linux-amd64 && \
chmod +x /bin/grpc_health_probe
# TODO support BASE_IMAGE
FROM nginx:1.21-alpine
FROM ${BASE_IMAGE}
ENV PATH=/opt/dragonfly/bin:$PATH
RUN echo "hosts: files dns" > /etc/nsswitch.conf
COPY --from=builder /go/src/d7y.io/dragonfly/v2/hack/start-cdn.sh /root/start.sh
COPY --from=builder /go/src/d7y.io/dragonfly/v2/hack/cdn-nginx.conf /etc/nginx/nginx.conf
COPY --from=builder /opt/dragonfly/bin/cdn /opt/dragonfly/bin/cdn
COPY --from=health /bin/grpc_health_probe /bin/grpc_health_probe

View File

@ -18,6 +18,7 @@ package cdn
import (
"context"
"net/http"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
@ -25,6 +26,7 @@ import (
"google.golang.org/grpc"
"d7y.io/dragonfly/v2/cdn/config"
"d7y.io/dragonfly/v2/cdn/fileserver"
"d7y.io/dragonfly/v2/cdn/gc"
"d7y.io/dragonfly/v2/cdn/metrics"
"d7y.io/dragonfly/v2/cdn/rpcserver"
@ -33,6 +35,7 @@ import (
"d7y.io/dragonfly/v2/cdn/supervisor/cdn/storage"
"d7y.io/dragonfly/v2/cdn/supervisor/progress"
"d7y.io/dragonfly/v2/cdn/supervisor/task"
"d7y.io/dragonfly/v2/client/daemon/upload"
logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/rpc/manager"
managerClient "d7y.io/dragonfly/v2/pkg/rpc/manager/client"
@ -54,6 +57,9 @@ type Server struct {
// gc Server
gcServer *gc.Server
// fileServer
fileServer *fileserver.Server
}
// New creates a brand-new server instance.
@ -97,6 +103,8 @@ func New(config *config.Config) (*Server, error) {
return nil, errors.Wrap(err, "create rpcServer")
}
fileServer := fileserver.New(config.RPCServer.DownloadPort, upload.PeerDownloadHTTPPathPrefix, storageManager.GetUploadPath())
// Initialize gc server
gcServer, err := gc.New()
if err != nil {
@ -126,6 +134,7 @@ func New(config *config.Config) (*Server, error) {
metricsServer: metricsServer,
configServer: configServer,
gcServer: gcServer,
fileServer: fileServer,
}, nil
}
@ -172,6 +181,16 @@ func (s *Server) Serve() error {
}
}()
go func() {
// Start file server
if err := s.fileServer.ListenAndServe(); err != nil {
if err == http.ErrServerClosed {
return
}
logger.Fatalf("start cdn file server failed: %v", err)
}
}()
// Start grpc server
return s.grpcServer.ListenAndServe()
}
@ -198,5 +217,10 @@ func (s *Server) Stop() error {
// Stop grpc server
return s.grpcServer.Shutdown()
})
g.Go(func() error {
// Stop file server
return s.fileServer.Shutdown(ctx)
})
return g.Wait()
}

View File

@ -0,0 +1,111 @@
/*
* Copyright 2020 The Dragonfly Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package fileserver
import (
"context"
"fmt"
"io/fs"
"net/http"
"os"
"path"
"path/filepath"
"strings"
"syscall"
"time"
"github.com/pkg/errors"
logger "d7y.io/dragonfly/v2/internal/dflog"
)
const (
maxRetryAttempts = 5
)
// d7yDir is modified from http.Dir, for http.Dir does not treat syscall.EMFILE as we want.
type d7yDir string
func mapDirOpenError(originalErr error, name string) error {
if os.IsNotExist(originalErr) || os.IsPermission(originalErr) {
return originalErr
}
parts := strings.Split(name, string(filepath.Separator))
for i := range parts {
if parts[i] == "" {
continue
}
fi, err := os.Stat(strings.Join(parts[:i+1], string(filepath.Separator)))
if err != nil {
return originalErr
}
if !fi.IsDir() {
pathError, ok := originalErr.(*fs.PathError)
if ok && pathError.Err == syscall.EMFILE {
return syscall.EMFILE
}
return fs.ErrNotExist
}
}
return originalErr
}
func (d d7yDir) Open(name string) (http.File, error) {
if filepath.Separator != '/' && strings.ContainsRune(name, filepath.Separator) {
return nil, errors.New("http: invalid character in file path")
}
dir := string(d)
if dir == "" {
dir = "."
}
fullName := filepath.Join(dir, filepath.FromSlash(path.Clean("/"+name)))
f, err := os.Open(fullName)
if err != nil {
mappedErr := mapDirOpenError(err, fullName)
// retry when syscall.EMFILE
for i := 0; mappedErr == syscall.EMFILE && i < maxRetryAttempts; i++ {
time.Sleep(100 * time.Millisecond)
f, err = os.Open(fullName)
if err == nil {
return f, nil
}
mappedErr = mapDirOpenError(err, fullName)
}
return nil, mappedErr
}
return f, nil
}
type Server struct {
*http.Server
}
func New(port int, prefix, uploadPath string) *Server {
return &Server{
&http.Server{
Addr: fmt.Sprintf(":%d", port),
Handler: http.StripPrefix(prefix, http.FileServer(d7yDir(uploadPath))),
},
}
}
func (server *Server) Shutdown(ctx context.Context) error {
defer logger.Infof("====stopped file server====")
return server.Server.Shutdown(ctx)
}

View File

@ -281,6 +281,10 @@ func (s *diskStorageManager) TryFreeSpace(fileLength int64) (bool, error) {
return true, nil
}
func (s *diskStorageManager) GetUploadPath() string {
return s.diskDriver.GetPath(storage.GetUploadHomeRaw())
}
func (s *diskStorageManager) GC() error {
logger.StorageGCLogger.With("type", "disk").Debug("start the disk storage gc job")
gcTaskIDs, err := s.diskCleaner.GC("disk", false)

View File

@ -381,6 +381,10 @@ func (h *hybridStorageManager) tryShmSpace(url, taskID string, fileLength int64)
return "", fmt.Errorf("shared memory is not allowed")
}
func (h *hybridStorageManager) GetUploadPath() string {
return h.diskDriver.GetPath(storage.GetUploadHomeRaw())
}
func (h *hybridStorageManager) GC() error {
logger.StorageGCLogger.With("type", "hybrid").Debug("start the hybrid storage gc job")
var wg sync.WaitGroup

View File

@ -65,6 +65,20 @@ func (mr *MockManagerMockRecorder) DeleteTask(arg0 interface{}) *gomock.Call {
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockManager)(nil).DeleteTask), arg0)
}
// GetUploadPath mocks base method.
func (m *MockManager) GetUploadPath() string {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "GetUploadPath")
ret0, _ := ret[0].(string)
return ret0
}
// GetUploadPath indicates an expected call of GetUploadPath.
func (mr *MockManagerMockRecorder) GetUploadPath() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetUploadPath", reflect.TypeOf((*MockManager)(nil).GetUploadPath))
}
// ReadDownloadFile mocks base method.
func (m *MockManager) ReadDownloadFile(arg0 string) (io.ReadCloser, error) {
m.ctrl.T.Helper()

View File

@ -74,6 +74,9 @@ type Manager interface {
// TryFreeSpace checks if there is enough space for the file, return true while we are sure that there is enough space.
TryFreeSpace(fileLength int64) (bool, error)
// GetUploadPath get path of upload file
GetUploadPath() string
}
// FileMetadata meta data of task

View File

@ -4,6 +4,4 @@ set -o nounset
set -o errexit
set -o pipefail
nginx
/opt/dragonfly/bin/cdn "$@"