diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index a36096939..790b0dff8 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -30,11 +30,9 @@ import ( "time" "github.com/gin-gonic/gin" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "golang.org/x/sync/errgroup" "golang.org/x/time/rate" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -53,11 +51,9 @@ import ( "d7y.io/dragonfly/v2/client/util" "d7y.io/dragonfly/v2/cmd/dependency" logger "d7y.io/dragonfly/v2/internal/dflog" - pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer" "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/dfpath" "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/resolver" "d7y.io/dragonfly/v2/pkg/rpc" managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" @@ -121,17 +117,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { ) if opt.Scheduler.Manager.Enable { - // New manager client. - var managerDialOptions []grpc.DialOption - if opt.Options.Telemetry.Jaeger != "" { - managerDialOptions = append(managerDialOptions, - grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), - grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()), - ) - } - var err error - managerClient, err = managerclient.GetClientByAddr(opt.Scheduler.Manager.NetAddrs, managerDialOptions...) + managerClient, err = managerclient.GetClientByAddr(opt.Scheduler.Manager.NetAddrs) if err != nil { return nil, err } @@ -155,25 +142,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } } - // register resolver and balancer. - resolver.RegisterScheduler(dynconfig) - balancer.Register(pkgbalancer.NewConsistentHashingBuilder()) - - var schedulerClientOptions []grpc.DialOption - if opt.Options.Telemetry.Jaeger != "" { - schedulerClientOptions = append(schedulerClientOptions, - grpc.WithChainUnaryInterceptor( - otelgrpc.UnaryClientInterceptor(), - rpc.RefresherUnaryClientInterceptor(dynconfig), - ), - grpc.WithChainStreamInterceptor( - otelgrpc.StreamClientInterceptor(), - rpc.RefresherStreamClientInterceptor(dynconfig), - ), - ) - } - - sched, err := schedulerclient.GetClient(schedulerClientOptions...) + sched, err := schedulerclient.GetClient(dynconfig) if err != nil { return nil, fmt.Errorf("failed to get schedulers: %w", err) } @@ -230,19 +199,7 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { } peerServerOption = append(peerServerOption, grpc.Creds(tlsCredentials)) } - // enable grpc tracing - if opt.Options.Telemetry.Jaeger != "" { - downloadServerOption = append( - downloadServerOption, - grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), - ) - peerServerOption = append( - peerServerOption, - grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), - ) - } + rpcManager, err := rpcserver.New(host, peerTaskManager, storageManager, defaultPattern, downloadServerOption, peerServerOption) if err != nil { return nil, err diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index f134b6962..8695bfbf7 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -44,6 +44,7 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" + dfdaemonv1mocks "d7y.io/api/pkg/apis/dfdaemon/v1/mocks" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" @@ -58,7 +59,6 @@ import ( "d7y.io/dragonfly/v2/pkg/idgen" "d7y.io/dragonfly/v2/pkg/rpc" daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" - servermocks "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server/mocks" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" schedulerclientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks" "d7y.io/dragonfly/v2/pkg/source" @@ -89,7 +89,7 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio schedulerclient.Client, storage.Manager) { port := int32(freeport.GetPort()) // 1. set up a mock daemon server for uploading pieces info - var daemon = servermocks.NewMockDaemonServer(ctrl) + var daemon = dfdaemonv1mocks.NewMockDaemonServer(ctrl) // 1.1 calculate piece digest and total digest r := bytes.NewBuffer(opt.content) diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index f5f64d70b..a9fad828d 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -40,6 +40,7 @@ import ( commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" + dfdaemonv1mocks "d7y.io/api/pkg/apis/dfdaemon/v1/mocks" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" schedulerv1mocks "d7y.io/api/pkg/apis/scheduler/v1/mocks" @@ -52,7 +53,6 @@ import ( "d7y.io/dragonfly/v2/pkg/digest" "d7y.io/dragonfly/v2/pkg/rpc" daemonserver "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server" - servermocks "d7y.io/dragonfly/v2/pkg/rpc/dfdaemon/server/mocks" schedulerclient "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client" clientmocks "d7y.io/dragonfly/v2/pkg/rpc/scheduler/client/mocks" "d7y.io/dragonfly/v2/pkg/source" @@ -64,7 +64,7 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, schedulerclient.Client, storage.Manager) { port := int32(freeport.GetPort()) // 1. set up a mock daemon server for uploading pieces info - var daemon = servermocks.NewMockDaemonServer(ctrl) + var daemon = dfdaemonv1mocks.NewMockDaemonServer(ctrl) var piecesMd5 []string pieceCount := int32(math.Ceil(float64(opt.contentLength) / float64(opt.pieceSize))) @@ -105,7 +105,7 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, Type: "tcp", Addr: fmt.Sprintf("0.0.0.0:%d", port), }) - go func(daemon *servermocks.MockDaemonServer, ln net.Listener) { + go func(daemon *dfdaemonv1mocks.MockDaemonServer, ln net.Listener) { if err := daemonserver.New(daemon).Serve(ln); err != nil { log.Fatal(err) } diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index 4bf8ca5b1..f8ade7182 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -38,9 +38,8 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" + "google.golang.org/protobuf/types/known/emptypb" cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -97,11 +96,7 @@ func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager, } s.downloadServer = dfdaemonserver.New(s, downloadOpts...) - healthpb.RegisterHealthServer(s.downloadServer, health.NewServer()) - s.peerServer = dfdaemonserver.New(s, peerOpts...) - healthpb.RegisterHealthServer(s.peerServer, health.NewServer()) - cdnsystemv1.RegisterSeederServer(s.peerServer, sd) return s, nil } @@ -318,9 +313,9 @@ func (s *server) SyncPieceTasks(sync dfdaemonv1.Daemon_SyncPieceTasksServer) err return sub.sendRemainingPieceTasks() } -func (s *server) CheckHealth(context.Context) error { +func (s *server) CheckHealth(context.Context, *emptypb.Empty) (*emptypb.Empty, error) { s.Keep() - return nil + return new(emptypb.Empty), nil } func (s *server) Download(req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) error { @@ -535,7 +530,7 @@ func (s *server) doDownload(ctx context.Context, req *dfdaemonv1.DownRequest, st } } -func (s *server) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest) error { +func (s *server) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest) (*emptypb.Empty, error) { s.Keep() taskID := idgen.TaskID(req.Url, req.UrlMeta) log := logger.With("function", "StatTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID, "LocalOnly", req.LocalOnly) @@ -543,35 +538,36 @@ func (s *server) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest) log.Info("new stat task request") if completed := s.isTaskCompleted(taskID); completed { log.Info("task found in local storage") - return nil + return new(emptypb.Empty), nil } // If only stat local cache and task doesn't exist, return not found if req.LocalOnly { msg := "task not found in local cache" log.Info(msg) - return dferrors.New(commonv1.Code_PeerTaskNotFound, msg) + return nil, dferrors.New(commonv1.Code_PeerTaskNotFound, msg) } // Check scheduler if other peers hold the task task, se := s.peerTaskManager.StatTask(ctx, taskID) if se != nil { - return se + return new(emptypb.Empty), se } + // Task available for download only if task is in succeeded state and has available peer if task.State == resource.TaskStateSucceeded && task.HasAvailablePeer { - return nil + return new(emptypb.Empty), nil } msg := fmt.Sprintf("task found but not available for download, state %s, has available peer %t", task.State, task.HasAvailablePeer) log.Info(msg) - return dferrors.New(commonv1.Code_PeerTaskNotFound, msg) + return nil, dferrors.New(commonv1.Code_PeerTaskNotFound, msg) } func (s *server) isTaskCompleted(taskID string) bool { return s.storageManager.FindCompletedTask(taskID) != nil } -func (s *server) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest) error { +func (s *server) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest) (*emptypb.Empty, error) { s.Keep() peerID := idgen.PeerID(s.peerHost.Ip) taskID := idgen.TaskID(req.Url, req.UrlMeta) @@ -601,7 +597,7 @@ func (s *server) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskReque // Announce to scheduler as well, but in background ptm.PeerID = task.PeerID go announceFunc() - return nil + return new(emptypb.Empty), nil } // 1. Register to storageManager @@ -615,7 +611,7 @@ func (s *server) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskReque if err != nil { msg := fmt.Sprintf("register task to storage manager failed: %v", err) log.Error(msg) - return errors.New(msg) + return nil, errors.New(msg) } // 2. Import task file @@ -623,17 +619,17 @@ func (s *server) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskReque if err := pieceManager.ImportFile(ctx, ptm, tsd, req); err != nil { msg := fmt.Sprintf("import file failed: %v", err) log.Error(msg) - return errors.New(msg) + return nil, errors.New(msg) } log.Info("import file succeeded") // 3. Announce to scheduler asynchronously go announceFunc() - return nil + return new(emptypb.Empty), nil } -func (s *server) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest) error { +func (s *server) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest) (*emptypb.Empty, error) { s.Keep() taskID := idgen.TaskID(req.Url, req.UrlMeta) log := logger.With("function", "ExportTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID, "destination", req.Output) @@ -645,17 +641,19 @@ func (s *server) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskReque if req.LocalOnly { msg := fmt.Sprintf("task not found in local storage") log.Info(msg) - return dferrors.New(commonv1.Code_PeerTaskNotFound, msg) + return nil, dferrors.New(commonv1.Code_PeerTaskNotFound, msg) } log.Info("task not found, try from peers") - return s.exportFromPeers(ctx, log, req) + return new(emptypb.Empty), s.exportFromPeers(ctx, log, req) } + err := s.exportFromLocal(ctx, req, task.PeerID) if err != nil { log.Errorf("export from local failed: %s", err) - return err + return nil, err } - return nil + + return new(emptypb.Empty), nil } func (s *server) exportFromLocal(ctx context.Context, req *dfdaemonv1.ExportTaskRequest, peerID string) error { @@ -755,7 +753,7 @@ func call(ctx context.Context, peerID string, sender ResultSender, s *server, re } } -func (s *server) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest) error { +func (s *server) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest) (*emptypb.Empty, error) { s.Keep() taskID := idgen.TaskID(req.Url, req.UrlMeta) log := logger.With("function", "DeleteTask", "URL", req.Url, "Tag", req.UrlMeta.Tag, "taskID", taskID) @@ -764,7 +762,7 @@ func (s *server) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskReque task := s.storageManager.FindCompletedTask(taskID) if task == nil { log.Info("task not found, skip delete") - return nil + return new(emptypb.Empty), nil } // Unregister task @@ -775,9 +773,10 @@ func (s *server) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskReque if err := s.storageManager.UnregisterTask(ctx, unregReq); err != nil { msg := fmt.Sprintf("failed to UnregisterTask: %s", err) log.Errorf(msg) - return errors.New(msg) + return nil, errors.New(msg) } - return nil + + return new(emptypb.Empty), nil } func checkOutput(output string) error { diff --git a/go.mod b/go.mod index 26d9a3903..dab58e79e 100644 --- a/go.mod +++ b/go.mod @@ -38,6 +38,7 @@ require ( github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 github.com/hashicorp/go-multierror v1.1.1 github.com/jarcoal/httpmock v1.2.0 + github.com/juju/ratelimit v1.0.2 github.com/looplab/fsm v0.3.0 github.com/mcuadros/go-gin-prometheus v0.1.0 github.com/mdlayher/vsock v1.1.1 diff --git a/go.sum b/go.sum index b79b07fcc..c3bff36ab 100644 --- a/go.sum +++ b/go.sum @@ -660,6 +660,8 @@ github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHm github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= +github.com/juju/ratelimit v1.0.2 h1:sRxmtRiajbvrcLQT7S+JbqU0ntsb9W2yhSdNN8tWfaI= +github.com/juju/ratelimit v1.0.2/go.mod h1:qapgC/Gy+xNh9UxzV13HGGl/6UXNN+ct+vwSgWNm/qk= github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w= github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8IZAc4RVcycCCAKdM= github.com/k0kubun/go-ansi v0.0.0-20180517002512-3bf9e2903213/go.mod h1:vNUNkEQ1e29fT/6vq2aBdFsgNPmy8qMdSay1npru+Sw= diff --git a/manager/manager.go b/manager/manager.go index c7e0ac327..328fb004c 100644 --- a/manager/manager.go +++ b/manager/manager.go @@ -24,7 +24,6 @@ import ( "time" "github.com/gin-contrib/static" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" logger "d7y.io/dragonfly/v2/internal/dflog" @@ -155,14 +154,7 @@ func New(cfg *config.Config, d dfpath.Dfpath) (*Server, error) { } // Initialize GRPC server - var grpcOptions []grpc.ServerOption - if s.config.Options.Telemetry.Jaeger != "" { - grpcOptions = []grpc.ServerOption{ - grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), - } - } - grpcServer := rpcserver.New(cfg, db, cache, searcher, objectStorage, cfg.ObjectStorage, grpcOptions...) + grpcServer := rpcserver.New(cfg, db, cache, searcher, objectStorage, cfg.ObjectStorage) s.grpcServer = grpcServer // Initialize prometheus diff --git a/manager/rpcserver/rpcserver.go b/manager/rpcserver/rpcserver.go index 9c3a53df7..9d8964ca6 100644 --- a/manager/rpcserver/rpcserver.go +++ b/manager/rpcserver/rpcserver.go @@ -25,16 +25,8 @@ import ( cachev8 "github.com/go-redis/cache/v8" "github.com/go-redis/redis/v8" "github.com/google/uuid" - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" - grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" - grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/status" emptypb "google.golang.org/protobuf/types/known/emptypb" timestamppb "google.golang.org/protobuf/types/known/timestamppb" @@ -51,28 +43,9 @@ import ( "d7y.io/dragonfly/v2/manager/searcher" "d7y.io/dragonfly/v2/manager/types" "d7y.io/dragonfly/v2/pkg/objectstorage" + managerserver "d7y.io/dragonfly/v2/pkg/rpc/manager/server" ) -// Default middlewares for stream. -func defaultStreamMiddleWares() []grpc.StreamServerInterceptor { - return []grpc.StreamServerInterceptor{ - grpc_validator.StreamServerInterceptor(), - grpc_recovery.StreamServerInterceptor(), - grpc_prometheus.StreamServerInterceptor, - grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), - } -} - -// Default middlewares for unary. -func defaultUnaryMiddleWares() []grpc.UnaryServerInterceptor { - return []grpc.UnaryServerInterceptor{ - grpc_validator.UnaryServerInterceptor(), - grpc_recovery.UnaryServerInterceptor(), - grpc_prometheus.UnaryServerInterceptor, - grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), - } -} - // Server is grpc server. type Server struct { // Manager configuration. @@ -85,8 +58,6 @@ type Server struct { cache *cache.Cache // Searcher interface. searcher searcher.Searcher - // Manager grpc interface. - managerv1.UnimplementedManagerServer // Object storage interface. objectStorage objectstorage.ObjectStorage // Object storage configuration. @@ -98,7 +69,7 @@ func New( cfg *config.Config, database *database.Database, cache *cache.Cache, searcher searcher.Searcher, objectStorage objectstorage.ObjectStorage, objectStorageConfig *config.ObjectStorageConfig, opts ...grpc.ServerOption, ) *grpc.Server { - server := &Server{ + return managerserver.New(&Server{ config: cfg, db: database.DB, rdb: database.RDB, @@ -106,19 +77,7 @@ func New( searcher: searcher, objectStorage: objectStorage, objectStorageConfig: objectStorageConfig, - } - - grpcServer := grpc.NewServer(append([]grpc.ServerOption{ - grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer(defaultStreamMiddleWares()...)), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares()...)), - }, opts...)...) - - // Register servers on grpc server. - managerv1.RegisterManagerServer(grpcServer, server) - healthpb.RegisterHealthServer(grpcServer, health.NewServer()) - return grpcServer + }, opts...) } // Get SeedPeer and SeedPeer cluster configuration. diff --git a/pkg/rpc/cdnsystem/client/client.go b/pkg/rpc/cdnsystem/client/client.go index a0eed6ba3..4ab7d07e1 100644 --- a/pkg/rpc/cdnsystem/client/client.go +++ b/pkg/rpc/cdnsystem/client/client.go @@ -26,16 +26,20 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials/insecure" cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/pkg/apis/common/v1" logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/balancer" + pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer" "d7y.io/dragonfly/v2/pkg/dfnet" "d7y.io/dragonfly/v2/pkg/resolver" + "d7y.io/dragonfly/v2/pkg/rpc" + "d7y.io/dragonfly/v2/scheduler/config" ) const ( @@ -50,29 +54,27 @@ const ( perRetryTimeout = 3 * time.Second ) -// defaultDialOptions is default dial options of manager client. -var defaultDialOptions = []grpc.DialOption{ - grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - grpc_prometheus.UnaryClientInterceptor, - grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), - grpc_retry.UnaryClientInterceptor( - grpc_retry.WithPerRetryTimeout(perRetryTimeout), - grpc_retry.WithMax(maxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), - ), - )), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_prometheus.StreamClientInterceptor, - grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), - )), -} - func GetClientByAddr(netAddr dfnet.NetAddr, options ...grpc.DialOption) (Client, error) { conn, err := grpc.Dial( netAddr.Addr, - append(defaultDialOptions, options...)..., + append([]grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otelgrpc.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), + }, options...)..., ) if err != nil { return nil, err @@ -84,10 +86,34 @@ func GetClientByAddr(netAddr dfnet.NetAddr, options ...grpc.DialOption) (Client, }, nil } -func GetClient(options ...grpc.DialOption) (Client, error) { +func GetClient(dynconfig config.DynconfigInterface, options ...grpc.DialOption) (Client, error) { + // Register resolver and balancer. + resolver.RegisterSeedPeer(dynconfig) + balancer.Register(pkgbalancer.NewConsistentHashingBuilder()) + conn, err := grpc.Dial( resolver.SeedPeerVirtualTarget, - append(defaultDialOptions, options...)..., + append([]grpc.DialOption{ + grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otelgrpc.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + rpc.RefresherUnaryClientInterceptor(dynconfig), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + rpc.RefresherStreamClientInterceptor(dynconfig), + )), + }, options...)..., ) if err != nil { return nil, err @@ -120,7 +146,7 @@ type client struct { // ObtainSeeds triggers the seed peer to download task back-to-source.. func (c *client) ObtainSeeds(ctx context.Context, req *cdnsystemv1.SeedRequest, options ...grpc.CallOption) (cdnsystemv1.Seeder_ObtainSeedsClient, error) { return c.SeederClient.ObtainSeeds( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ) @@ -129,7 +155,7 @@ func (c *client) ObtainSeeds(ctx context.Context, req *cdnsystemv1.SeedRequest, // GetPieceTasks gets detail information of task. func (c *client) GetPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, options ...grpc.CallOption) (*commonv1.PiecePacket, error) { return c.SeederClient.GetPieceTasks( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ) @@ -138,7 +164,7 @@ func (c *client) GetPieceTasks(ctx context.Context, req *commonv1.PieceTaskReque // SyncPieceTasks syncs detail information of task. func (c *client) SyncPieceTasks(ctx context.Context, req *commonv1.PieceTaskRequest, options ...grpc.CallOption) (cdnsystemv1.Seeder_SyncPieceTasksClient, error) { stream, err := c.SeederClient.SyncPieceTasks( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), options..., ) if err != nil { diff --git a/pkg/rpc/cdnsystem/server/server.go b/pkg/rpc/cdnsystem/server/server.go new file mode 100644 index 000000000..c130492a4 --- /dev/null +++ b/pkg/rpc/cdnsystem/server/server.go @@ -0,0 +1,86 @@ +/* + * Copyright 2020 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_ratelimit "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" + + cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" +) + +const ( + // DefaultQPS is default qps of grpc server. + DefaultQPS = 10 * 1000 + + // DefaultBurst is default burst of grpc server. + DefaultBurst = 20 * 1000 + + // DefaultMaxConnectionIdle is default timeout of connection idle state. + // If a client state is idle for DefaultMaxConnectionIdle, send a GOAWAY. + DefaultMaxConnectionIdle = 10 * time.Second +) + +// New returns a grpc server instance and register service on grpc server. +func New(svr cdnsystemv1.SeederServer, opts ...grpc.ServerOption) *grpc.Server { + limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) + + grpcServer := grpc.NewServer(append([]grpc.ServerOption{ + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: DefaultMaxConnectionIdle, + }), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + grpc_ratelimit.UnaryServerInterceptor(limiter), + rpc.ConvertErrorUnaryServerInterceptor, + otelgrpc.UnaryServerInterceptor(), + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.UnaryServerInterceptor(), + grpc_recovery.UnaryServerInterceptor(), + )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + grpc_ratelimit.StreamServerInterceptor(limiter), + rpc.ConvertErrorStreamServerInterceptor, + otelgrpc.StreamServerInterceptor(), + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.StreamServerInterceptor(), + grpc_recovery.StreamServerInterceptor(), + )), + }, opts...)...) + + // Register servers on grpc server. + cdnsystemv1.RegisterSeederServer(grpcServer, svr) + + // Register health on grpc server. + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) + return grpcServer +} diff --git a/pkg/rpc/dfdaemon/server/mocks/server_mock.go b/pkg/rpc/dfdaemon/server/mocks/server_mock.go deleted file mode 100644 index a18876dda..000000000 --- a/pkg/rpc/dfdaemon/server/mocks/server_mock.go +++ /dev/null @@ -1,150 +0,0 @@ -// Code generated by MockGen. DO NOT EDIT. -// Source: server.go - -// Package mocks is a generated GoMock package. -package mocks - -import ( - context "context" - reflect "reflect" - - v1 "d7y.io/api/pkg/apis/common/v1" - v10 "d7y.io/api/pkg/apis/dfdaemon/v1" - gomock "github.com/golang/mock/gomock" -) - -// MockDaemonServer is a mock of DaemonServer interface. -type MockDaemonServer struct { - ctrl *gomock.Controller - recorder *MockDaemonServerMockRecorder -} - -// MockDaemonServerMockRecorder is the mock recorder for MockDaemonServer. -type MockDaemonServerMockRecorder struct { - mock *MockDaemonServer -} - -// NewMockDaemonServer creates a new mock instance. -func NewMockDaemonServer(ctrl *gomock.Controller) *MockDaemonServer { - mock := &MockDaemonServer{ctrl: ctrl} - mock.recorder = &MockDaemonServerMockRecorder{mock} - return mock -} - -// EXPECT returns an object that allows the caller to indicate expected use. -func (m *MockDaemonServer) EXPECT() *MockDaemonServerMockRecorder { - return m.recorder -} - -// CheckHealth mocks base method. -func (m *MockDaemonServer) CheckHealth(arg0 context.Context) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CheckHealth", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// CheckHealth indicates an expected call of CheckHealth. -func (mr *MockDaemonServerMockRecorder) CheckHealth(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CheckHealth", reflect.TypeOf((*MockDaemonServer)(nil).CheckHealth), arg0) -} - -// DeleteTask mocks base method. -func (m *MockDaemonServer) DeleteTask(arg0 context.Context, arg1 *v10.DeleteTaskRequest) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "DeleteTask", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// DeleteTask indicates an expected call of DeleteTask. -func (mr *MockDaemonServerMockRecorder) DeleteTask(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteTask", reflect.TypeOf((*MockDaemonServer)(nil).DeleteTask), arg0, arg1) -} - -// Download mocks base method. -func (m *MockDaemonServer) Download(arg0 *v10.DownRequest, arg1 v10.Daemon_DownloadServer) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "Download", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// Download indicates an expected call of Download. -func (mr *MockDaemonServerMockRecorder) Download(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Download", reflect.TypeOf((*MockDaemonServer)(nil).Download), arg0, arg1) -} - -// ExportTask mocks base method. -func (m *MockDaemonServer) ExportTask(arg0 context.Context, arg1 *v10.ExportTaskRequest) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ExportTask", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ExportTask indicates an expected call of ExportTask. -func (mr *MockDaemonServerMockRecorder) ExportTask(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ExportTask", reflect.TypeOf((*MockDaemonServer)(nil).ExportTask), arg0, arg1) -} - -// GetPieceTasks mocks base method. -func (m *MockDaemonServer) GetPieceTasks(arg0 context.Context, arg1 *v1.PieceTaskRequest) (*v1.PiecePacket, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetPieceTasks", arg0, arg1) - ret0, _ := ret[0].(*v1.PiecePacket) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetPieceTasks indicates an expected call of GetPieceTasks. -func (mr *MockDaemonServerMockRecorder) GetPieceTasks(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetPieceTasks", reflect.TypeOf((*MockDaemonServer)(nil).GetPieceTasks), arg0, arg1) -} - -// ImportTask mocks base method. -func (m *MockDaemonServer) ImportTask(arg0 context.Context, arg1 *v10.ImportTaskRequest) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ImportTask", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// ImportTask indicates an expected call of ImportTask. -func (mr *MockDaemonServerMockRecorder) ImportTask(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ImportTask", reflect.TypeOf((*MockDaemonServer)(nil).ImportTask), arg0, arg1) -} - -// StatTask mocks base method. -func (m *MockDaemonServer) StatTask(arg0 context.Context, arg1 *v10.StatTaskRequest) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "StatTask", arg0, arg1) - ret0, _ := ret[0].(error) - return ret0 -} - -// StatTask indicates an expected call of StatTask. -func (mr *MockDaemonServerMockRecorder) StatTask(arg0, arg1 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StatTask", reflect.TypeOf((*MockDaemonServer)(nil).StatTask), arg0, arg1) -} - -// SyncPieceTasks mocks base method. -func (m *MockDaemonServer) SyncPieceTasks(arg0 v10.Daemon_SyncPieceTasksServer) error { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "SyncPieceTasks", arg0) - ret0, _ := ret[0].(error) - return ret0 -} - -// SyncPieceTasks indicates an expected call of SyncPieceTasks. -func (mr *MockDaemonServerMockRecorder) SyncPieceTasks(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SyncPieceTasks", reflect.TypeOf((*MockDaemonServer)(nil).SyncPieceTasks), arg0) -} diff --git a/pkg/rpc/dfdaemon/server/server.go b/pkg/rpc/dfdaemon/server/server.go index ca942aa23..c8cb247f6 100644 --- a/pkg/rpc/dfdaemon/server/server.go +++ b/pkg/rpc/dfdaemon/server/server.go @@ -14,88 +14,73 @@ * limitations under the License. */ -//go:generate mockgen -destination mocks/server_mock.go -source server.go -package mocks - package server import ( - "context" + "time" + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_ratelimit "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/peer" - "google.golang.org/protobuf/types/known/emptypb" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" - commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" logger "d7y.io/dragonfly/v2/internal/dflog" "d7y.io/dragonfly/v2/pkg/rpc" ) -// DaemonServer refer to dfdaemonv1.DaemonServer -type DaemonServer interface { - // Download triggers client to download file - Download(*dfdaemonv1.DownRequest, dfdaemonv1.Daemon_DownloadServer) error - // GetPieceTasks get piece tasks from other peers - GetPieceTasks(context.Context, *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) - // SyncPieceTasks sync piece tasks info with other peers - SyncPieceTasks(dfdaemonv1.Daemon_SyncPieceTasksServer) error - // CheckHealth check daemon health - CheckHealth(context.Context) error - // Check if the given task exists in P2P cache system - StatTask(context.Context, *dfdaemonv1.StatTaskRequest) error - // Import the given file into P2P cache system - ImportTask(context.Context, *dfdaemonv1.ImportTaskRequest) error - // Export or download file from P2P cache system - ExportTask(context.Context, *dfdaemonv1.ExportTaskRequest) error - // Delete file from P2P cache system - DeleteTask(context.Context, *dfdaemonv1.DeleteTaskRequest) error -} +const ( + // DefaultQPS is default qps of grpc server. + DefaultQPS = 10 * 1000 -type proxy struct { - server DaemonServer - dfdaemonv1.UnimplementedDaemonServer -} + // DefaultBurst is default burst of grpc server. + DefaultBurst = 20 * 1000 -func New(daemonServer DaemonServer, opts ...grpc.ServerOption) *grpc.Server { - grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions(), opts...)...) - dfdaemonv1.RegisterDaemonServer(grpcServer, &proxy{server: daemonServer}) + // DefaultMaxConnectionIdle is default timeout of connection idle state. + // If a client state is idle for DefaultMaxConnectionIdle, send a GOAWAY. + DefaultMaxConnectionIdle = 10 * time.Second +) + +// New returns a grpc server instance and register service on grpc server. +func New(svr dfdaemonv1.DaemonServer, opts ...grpc.ServerOption) *grpc.Server { + limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) + + grpcServer := grpc.NewServer(append([]grpc.ServerOption{ + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: DefaultMaxConnectionIdle, + }), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + grpc_ratelimit.UnaryServerInterceptor(limiter), + rpc.ConvertErrorUnaryServerInterceptor, + otelgrpc.UnaryServerInterceptor(), + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.UnaryServerInterceptor(), + grpc_recovery.UnaryServerInterceptor(), + )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + grpc_ratelimit.StreamServerInterceptor(limiter), + rpc.ConvertErrorStreamServerInterceptor, + otelgrpc.StreamServerInterceptor(), + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.StreamServerInterceptor(), + grpc_recovery.StreamServerInterceptor(), + )), + }, opts...)...) + + // Register servers on grpc server. + dfdaemonv1.RegisterDaemonServer(grpcServer, svr) + + // Register health on grpc server. + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) return grpcServer } - -func (p *proxy) Download(req *dfdaemonv1.DownRequest, stream dfdaemonv1.Daemon_DownloadServer) (err error) { - peerAddr := "unknown" - if pe, ok := peer.FromContext(stream.Context()); ok { - peerAddr = pe.Addr.String() - } - logger.Infof("trigger download for url: %s, from: %s, uuid: %s", req.Url, peerAddr, req.Uuid) - return p.server.Download(req, stream) -} - -func (p *proxy) GetPieceTasks(ctx context.Context, ptr *commonv1.PieceTaskRequest) (*commonv1.PiecePacket, error) { - return p.server.GetPieceTasks(ctx, ptr) -} - -func (p *proxy) SyncPieceTasks(sync dfdaemonv1.Daemon_SyncPieceTasksServer) error { - return p.server.SyncPieceTasks(sync) -} - -func (p *proxy) CheckHealth(ctx context.Context, req *emptypb.Empty) (*emptypb.Empty, error) { - return new(emptypb.Empty), p.server.CheckHealth(ctx) -} - -func (p *proxy) StatTask(ctx context.Context, req *dfdaemonv1.StatTaskRequest) (*emptypb.Empty, error) { - return new(emptypb.Empty), p.server.StatTask(ctx, req) -} - -func (p *proxy) ImportTask(ctx context.Context, req *dfdaemonv1.ImportTaskRequest) (*emptypb.Empty, error) { - return new(emptypb.Empty), p.server.ImportTask(ctx, req) -} - -func (p *proxy) ExportTask(ctx context.Context, req *dfdaemonv1.ExportTaskRequest) (*emptypb.Empty, error) { - return new(emptypb.Empty), p.server.ExportTask(ctx, req) -} - -func (p *proxy) DeleteTask(ctx context.Context, req *dfdaemonv1.DeleteTaskRequest) (*emptypb.Empty, error) { - return new(emptypb.Empty), p.server.DeleteTask(ctx, req) -} diff --git a/pkg/rpc/interceptor.go b/pkg/rpc/interceptor.go index 72d50da4f..7129d96e7 100644 --- a/pkg/rpc/interceptor.go +++ b/pkg/rpc/interceptor.go @@ -19,9 +19,16 @@ package rpc import ( "context" + "github.com/juju/ratelimit" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" + + commonv1 "d7y.io/api/pkg/apis/common/v1" + + "d7y.io/dragonfly/v2/internal/dferrors" + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc/common" ) // Refresher is the interface for refreshing dynconfig. @@ -57,3 +64,62 @@ func RefresherStreamClientInterceptor(r Refresher) grpc.StreamClientInterceptor return clientStream, err } } + +// RateLimiterInterceptor is the interface for ratelimit interceptor. +type RateLimiterInterceptor struct { + // tokenBucket is token bucket of ratelimit. + tokenBucket *ratelimit.Bucket +} + +// NewRateLimiterInterceptor returns a RateLimiterInterceptor instance. +func NewRateLimiterInterceptor(qps float64, burst int64) *RateLimiterInterceptor { + return &RateLimiterInterceptor{ + tokenBucket: ratelimit.NewBucketWithRate(qps, burst), + } +} + +// Limit is the predicate which limits the requests. +func (r *RateLimiterInterceptor) Limit() bool { + if r.tokenBucket.TakeAvailable(1) == 0 { + return true + } + + return false +} + +// ConvertErrorUnaryServerInterceptor returns a new unary server interceptor that convert error when trigger custom error. +func ConvertErrorUnaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + m, err := handler(ctx, req) + if err != nil { + err = convertError(err) + logger.GrpcLogger.Errorf("do unary server error: %v for method: %s", err, info.FullMethod) + } + + return m, err +} + +// ConvertErrorStreamServerInterceptor returns a new stream server interceptor that convert error when trigger custom error. +func ConvertErrorStreamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + err := handler(srv, ss) + if err != nil { + err = convertError(err) + logger.GrpcLogger.Errorf("do stream server error: %v for method: %s", err, info.FullMethod) + } + + return err +} + +// convertError converts custom error. +func convertError(err error) error { + if status.Code(err) == codes.InvalidArgument { + err = dferrors.New(commonv1.Code_BadRequest, err.Error()) + } + + if v, ok := err.(*dferrors.DfError); ok { + logger.GrpcLogger.Errorf(v.Message) + if s, e := status.Convert(err).WithDetails(common.NewGrpcDfError(v.Code, v.Message)); e == nil { + err = s.Err() + } + } + return err +} diff --git a/pkg/rpc/manager/client/client.go b/pkg/rpc/manager/client/client.go index a89b7fb31..cb887d9f2 100644 --- a/pkg/rpc/manager/client/client.go +++ b/pkg/rpc/manager/client/client.go @@ -27,6 +27,7 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -51,29 +52,28 @@ const ( perRetryTimeout = 5 * time.Second ) -// defaultDialOptions is default dial options of manager client. -var defaultDialOptions = []grpc.DialOption{ - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - grpc_prometheus.UnaryClientInterceptor, - grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), - grpc_retry.UnaryClientInterceptor( - grpc_retry.WithPerRetryTimeout(perRetryTimeout), - grpc_retry.WithMax(maxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), - ), - )), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_prometheus.StreamClientInterceptor, - grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), - )), -} - // GetClient returns manager client. func GetClient(target string, options ...grpc.DialOption) (Client, error) { conn, err := grpc.Dial( target, - append(defaultDialOptions, options...)..., + append([]grpc.DialOption{ + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otelgrpc.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), + }, options...)..., ) if err != nil { return nil, err diff --git a/pkg/rpc/manager/server/server.go b/pkg/rpc/manager/server/server.go new file mode 100644 index 000000000..41b623e8f --- /dev/null +++ b/pkg/rpc/manager/server/server.go @@ -0,0 +1,84 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_ratelimit "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" + + managerv1 "d7y.io/api/pkg/apis/manager/v1" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" +) + +const ( + // DefaultQPS is default qps of grpc server. + DefaultQPS = 10 * 1000 + + // DefaultBurst is default burst of grpc server. + DefaultBurst = 20 * 1000 + + // DefaultMaxConnectionIdle is default timeout of connection idle state. + // If a client state is idle for DefaultMaxConnectionIdle, send a GOAWAY. + DefaultMaxConnectionIdle = 10 * time.Second +) + +// New returns a grpc server instance and register service on grpc server. +func New(svr managerv1.ManagerServer, opts ...grpc.ServerOption) *grpc.Server { + limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) + + grpcServer := grpc.NewServer(append([]grpc.ServerOption{ + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: DefaultMaxConnectionIdle, + }), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + grpc_ratelimit.UnaryServerInterceptor(limiter), + otelgrpc.UnaryServerInterceptor(), + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.UnaryServerInterceptor(), + grpc_recovery.UnaryServerInterceptor(), + )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + grpc_ratelimit.StreamServerInterceptor(limiter), + otelgrpc.StreamServerInterceptor(), + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.StreamServerInterceptor(), + grpc_recovery.StreamServerInterceptor(), + )), + }, opts...)...) + + // Register servers on grpc server. + managerv1.RegisterManagerServer(grpcServer, svr) + + // Register health on grpc server. + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) + return grpcServer +} diff --git a/pkg/rpc/scheduler/client/client.go b/pkg/rpc/scheduler/client/client.go index 3140eb489..9e08aadd9 100644 --- a/pkg/rpc/scheduler/client/client.go +++ b/pkg/rpc/scheduler/client/client.go @@ -26,14 +26,17 @@ import ( grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" "google.golang.org/grpc/credentials/insecure" commonv1 "d7y.io/api/pkg/apis/common/v1" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" + "d7y.io/dragonfly/v2/client/config" logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/balancer" + pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer" "d7y.io/dragonfly/v2/pkg/resolver" "d7y.io/dragonfly/v2/pkg/rpc/common" ) @@ -50,30 +53,33 @@ const ( perRetryTimeout = 3 * time.Second ) -// defaultDialOptions is default dial options of manager client. -var defaultDialOptions = []grpc.DialOption{ - grpc.WithDefaultServiceConfig(balancer.BalancerServiceConfig), - grpc.WithTransportCredentials(insecure.NewCredentials()), - grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( - grpc_prometheus.UnaryClientInterceptor, - grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), - grpc_retry.UnaryClientInterceptor( - grpc_retry.WithPerRetryTimeout(perRetryTimeout), - grpc_retry.WithMax(maxRetries), - grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), - ), - )), - grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( - grpc_prometheus.StreamClientInterceptor, - grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), - )), -} - // GetClient get scheduler clients using resolver and balancer, -func GetClient(options ...grpc.DialOption) (Client, error) { +func GetClient(dynconfig config.Dynconfig, options ...grpc.DialOption) (Client, error) { + // Register resolver and balancer. + resolver.RegisterScheduler(dynconfig) + balancer.Register(pkgbalancer.NewConsistentHashingBuilder()) + conn, err := grpc.Dial( resolver.SchedulerVirtualTarget, - append(defaultDialOptions, options...)..., + append([]grpc.DialOption{ + grpc.WithDefaultServiceConfig(pkgbalancer.BalancerServiceConfig), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithUnaryInterceptor(grpc_middleware.ChainUnaryClient( + otelgrpc.UnaryClientInterceptor(), + grpc_prometheus.UnaryClientInterceptor, + grpc_zap.UnaryClientInterceptor(logger.GrpcLogger.Desugar()), + grpc_retry.UnaryClientInterceptor( + grpc_retry.WithPerRetryTimeout(perRetryTimeout), + grpc_retry.WithMax(maxRetries), + grpc_retry.WithBackoff(grpc_retry.BackoffLinear(backoffWaitBetween)), + ), + )), + grpc.WithStreamInterceptor(grpc_middleware.ChainStreamClient( + otelgrpc.StreamClientInterceptor(), + grpc_prometheus.StreamClientInterceptor, + grpc_zap.StreamClientInterceptor(logger.GrpcLogger.Desugar()), + )), + }, options...)..., ) if err != nil { return nil, err @@ -141,7 +147,7 @@ type client struct { // RegisterPeerTask registers a peer into task. func (c *client) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTaskRequest, options ...grpc.CallOption) (*schedulerv1.RegisterResult, error) { return c.SchedulerClient.RegisterPeerTask( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ) @@ -150,7 +156,7 @@ func (c *client) RegisterPeerTask(ctx context.Context, req *schedulerv1.PeerTask // ReportPieceResult reports piece results and receives peer packets. func (c *client) ReportPieceResult(ctx context.Context, req *schedulerv1.PeerTaskRequest, options ...grpc.CallOption) (schedulerv1.Scheduler_ReportPieceResultClient, error) { stream, err := c.SchedulerClient.ReportPieceResult( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), options..., ) if err != nil { @@ -163,7 +169,7 @@ func (c *client) ReportPieceResult(ctx context.Context, req *schedulerv1.PeerTas // ReportPeerResult reports downloading result for the peer. func (c *client) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResult, options ...grpc.CallOption) error { if _, err := c.SchedulerClient.ReportPeerResult( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ); err != nil { @@ -176,7 +182,7 @@ func (c *client) ReportPeerResult(ctx context.Context, req *schedulerv1.PeerResu // LeaveTask makes the peer leaving from task. func (c *client) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget, options ...grpc.CallOption) error { if _, err := c.SchedulerClient.LeaveTask( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ); err != nil { @@ -189,7 +195,7 @@ func (c *client) LeaveTask(ctx context.Context, req *schedulerv1.PeerTarget, opt // Checks if any peer has the given task. func (c *client) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest, options ...grpc.CallOption) (*schedulerv1.Task, error) { return c.SchedulerClient.StatTask( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ) @@ -198,7 +204,7 @@ func (c *client) StatTask(ctx context.Context, req *schedulerv1.StatTaskRequest, // A peer announces that it has the announced task to other peers. func (c *client) AnnounceTask(ctx context.Context, req *schedulerv1.AnnounceTaskRequest, options ...grpc.CallOption) error { if _, err := c.SchedulerClient.AnnounceTask( - context.WithValue(ctx, balancer.ContextKey, req.TaskId), + context.WithValue(ctx, pkgbalancer.ContextKey, req.TaskId), req, options..., ); err != nil { diff --git a/pkg/rpc/scheduler/server/server.go b/pkg/rpc/scheduler/server/server.go new file mode 100644 index 000000000..f9ac3a91c --- /dev/null +++ b/pkg/rpc/scheduler/server/server.go @@ -0,0 +1,86 @@ +/* + * Copyright 2022 The Dragonfly Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package server + +import ( + "time" + + grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" + grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" + grpc_ratelimit "github.com/grpc-ecosystem/go-grpc-middleware/ratelimit" + grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery" + grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" + grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" + "google.golang.org/grpc" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" + "google.golang.org/grpc/keepalive" + + schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" + + logger "d7y.io/dragonfly/v2/internal/dflog" + "d7y.io/dragonfly/v2/pkg/rpc" +) + +const ( + // DefaultQPS is default qps of grpc server. + DefaultQPS = 10 * 1000 + + // DefaultBurst is default burst of grpc server. + DefaultBurst = 20 * 1000 + + // DefaultMaxConnectionIdle is default timeout of connection idle state. + // If a client state is idle for DefaultMaxConnectionIdle, send a GOAWAY. + DefaultMaxConnectionIdle = 10 * time.Second +) + +// New returns a grpc server instance and register service on grpc server. +func New(svr schedulerv1.SchedulerServer, opts ...grpc.ServerOption) *grpc.Server { + limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) + + grpcServer := grpc.NewServer(append([]grpc.ServerOption{ + grpc.KeepaliveParams(keepalive.ServerParameters{ + MaxConnectionIdle: DefaultMaxConnectionIdle, + }), + grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( + grpc_ratelimit.UnaryServerInterceptor(limiter), + rpc.ConvertErrorUnaryServerInterceptor, + otelgrpc.UnaryServerInterceptor(), + grpc_prometheus.UnaryServerInterceptor, + grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.UnaryServerInterceptor(), + grpc_recovery.UnaryServerInterceptor(), + )), + grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( + grpc_ratelimit.StreamServerInterceptor(limiter), + rpc.ConvertErrorStreamServerInterceptor, + otelgrpc.StreamServerInterceptor(), + grpc_prometheus.StreamServerInterceptor, + grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), + grpc_validator.StreamServerInterceptor(), + grpc_recovery.StreamServerInterceptor(), + )), + }, opts...)...) + + // Register servers on grpc server. + schedulerv1.RegisterSchedulerServer(grpcServer, svr) + + // Register health on grpc server. + healthpb.RegisterHealthServer(grpcServer, health.NewServer()) + return grpcServer +} diff --git a/pkg/rpc/server.go b/pkg/rpc/server.go deleted file mode 100644 index be42e1d0e..000000000 --- a/pkg/rpc/server.go +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2020 The Dragonfly Authors - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package rpc - -import ( - "context" - "time" - - grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware" - grpc_zap "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap" - grpc_validator "github.com/grpc-ecosystem/go-grpc-middleware/validator" - grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" - "google.golang.org/grpc" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/keepalive" - "google.golang.org/grpc/status" - - commonv1 "d7y.io/api/pkg/apis/common/v1" - - "d7y.io/dragonfly/v2/internal/dferrors" - logger "d7y.io/dragonfly/v2/internal/dflog" - "d7y.io/dragonfly/v2/pkg/rpc/common" -) - -func DefaultServerOptions() []grpc.ServerOption { - return []grpc.ServerOption{ - grpc.ConnectionTimeout(10 * time.Second), - grpc.InitialConnWindowSize(8 * 1024 * 1024), - grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ - MinTime: 30 * time.Second, - }), - grpc.KeepaliveParams(keepalive.ServerParameters{ - MaxConnectionIdle: 5 * time.Minute, - }), - grpc.MaxConcurrentStreams(100), - grpc.StreamInterceptor(grpc_middleware.ChainStreamServer( - streamServerInterceptor, - grpc_prometheus.StreamServerInterceptor, - grpc_zap.StreamServerInterceptor(logger.GrpcLogger.Desugar()), - grpc_validator.StreamServerInterceptor(), - )), - grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer( - unaryServerInterceptor, - grpc_prometheus.UnaryServerInterceptor, - grpc_zap.UnaryServerInterceptor(logger.GrpcLogger.Desugar()), - grpc_validator.UnaryServerInterceptor(), - )), - } -} - -func streamServerInterceptor(srv any, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error { - err := handler(srv, ss) - if err != nil { - err = convertServerError(err) - logger.GrpcLogger.Errorf("do stream server error: %v for method: %s", err, info.FullMethod) - } - return err -} - -func unaryServerInterceptor(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { - m, err := handler(ctx, req) - if err != nil { - err = convertServerError(err) - logger.GrpcLogger.Errorf("do unary server error: %v for method: %s", err, info.FullMethod) - } - - return m, err -} - -func convertServerError(err error) error { - if status.Code(err) == codes.InvalidArgument { - err = dferrors.New(commonv1.Code_BadRequest, err.Error()) - } - if v, ok := err.(*dferrors.DfError); ok { - logger.GrpcLogger.Errorf(v.Message) - if s, e := status.Convert(err).WithDetails(common.NewGrpcDfError(v.Code, v.Message)); e == nil { - err = s.Err() - } - } - return err -} diff --git a/scheduler/resource/resource_test.go b/scheduler/resource/resource_test.go index aa666f0ab..c07d916d7 100644 --- a/scheduler/resource/resource_test.go +++ b/scheduler/resource/resource_test.go @@ -23,6 +23,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "google.golang.org/grpc/resolver" "d7y.io/dragonfly/v2/pkg/gc" "d7y.io/dragonfly/v2/scheduler/config" @@ -46,6 +47,8 @@ func TestResource_New(t *testing.T) { SeedPeers: []*config.SeedPeer{{ID: 1}}, }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { @@ -119,6 +122,8 @@ func TestResource_New(t *testing.T) { SeedPeers: []*config.SeedPeer{}, }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, resource Resource, err error) { diff --git a/scheduler/resource/seed_peer_client.go b/scheduler/resource/seed_peer_client.go index e234405c1..0d7700a26 100644 --- a/scheduler/resource/seed_peer_client.go +++ b/scheduler/resource/seed_peer_client.go @@ -62,7 +62,7 @@ func newSeedPeerClient(dynconfig config.DynconfigInterface, hostManager HostMana logger.Infof("initialize seed peer addresses: %#v", seedPeersToNetAddrs(config.SeedPeers)) // Initialize seed peer grpc client. - client, err := client.GetClient(opts...) + client, err := client.GetClient(dynconfig, opts...) if err != nil { return nil, err } diff --git a/scheduler/resource/seed_peer_client_test.go b/scheduler/resource/seed_peer_client_test.go index 2be14f262..0f45300b6 100644 --- a/scheduler/resource/seed_peer_client_test.go +++ b/scheduler/resource/seed_peer_client_test.go @@ -24,6 +24,7 @@ import ( gomock "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" + "google.golang.org/grpc/resolver" "d7y.io/dragonfly/v2/manager/model" "d7y.io/dragonfly/v2/manager/types" @@ -45,6 +46,8 @@ func TestSeedPeerClient_newSeedPeerClient(t *testing.T) { dynconfig.Get().Return(&config.DynconfigData{ SeedPeers: []*config.SeedPeer{{ID: 1}}, }, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), ) @@ -72,6 +75,8 @@ func TestSeedPeerClient_newSeedPeerClient(t *testing.T) { SeedPeers: []*config.SeedPeer{}, }, nil).Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), ) }, expect: func(t *testing.T, err error) { @@ -121,6 +126,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { Port: 8080, }}, }, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), ) @@ -145,6 +152,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { IP: "127.0.0.1", }}, }, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), hostManager.Load(gomock.Any()).Return(mockHost, true).Times(1), @@ -171,6 +180,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { IP: "127.0.0.1", }}, }, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), hostManager.Load(gomock.Any()).Return(nil, false).Times(1), @@ -194,6 +205,8 @@ func TestSeedPeerClient_OnNotify(t *testing.T) { IP: "127.0.0.1", }}, }, nil).Times(1), + dynconfig.Register(gomock.Any()).Return().Times(1), + dynconfig.GetResolveSeedPeerAddrs().Return([]resolver.Address{}, nil).Times(1), hostManager.Store(gomock.Any()).Return().Times(1), dynconfig.Register(gomock.Any()).Return().Times(1), ) diff --git a/scheduler/rpcserver/rpcserver.go b/scheduler/rpcserver/rpcserver.go index 1661c3247..267d4805b 100644 --- a/scheduler/rpcserver/rpcserver.go +++ b/scheduler/rpcserver/rpcserver.go @@ -20,14 +20,12 @@ import ( "context" "google.golang.org/grpc" - "google.golang.org/grpc/health" - healthpb "google.golang.org/grpc/health/grpc_health_v1" empty "google.golang.org/protobuf/types/known/emptypb" schedulerv1 "d7y.io/api/pkg/apis/scheduler/v1" "d7y.io/dragonfly/v2/pkg/idgen" - "d7y.io/dragonfly/v2/pkg/rpc" + "d7y.io/dragonfly/v2/pkg/rpc/scheduler/server" "d7y.io/dragonfly/v2/scheduler/metrics" "d7y.io/dragonfly/v2/scheduler/resource" "d7y.io/dragonfly/v2/scheduler/service" @@ -37,20 +35,11 @@ import ( type Server struct { // Service interface. service *service.Service - - // GRPC UnimplementedSchedulerServer interface. - schedulerv1.UnimplementedSchedulerServer } // New returns a new transparent scheduler server from the given options. func New(service *service.Service, opts ...grpc.ServerOption) *grpc.Server { - svr := &Server{service: service} - grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions(), opts...)...) - - // Register servers on grpc server. - schedulerv1.RegisterSchedulerServer(grpcServer, svr) - healthpb.RegisterHealthServer(grpcServer, health.NewServer()) - return grpcServer + return server.New(&Server{service: service}, opts...) } // RegisterPeerTask registers peer and triggers seed peer download task. diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 305e5fc06..ad9098d47 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -23,18 +23,13 @@ import ( "net/http" "time" - "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/balancer" managerv1 "d7y.io/api/pkg/apis/manager/v1" logger "d7y.io/dragonfly/v2/internal/dflog" - pkgbalancer "d7y.io/dragonfly/v2/pkg/balancer" "d7y.io/dragonfly/v2/pkg/dfpath" "d7y.io/dragonfly/v2/pkg/gc" - "d7y.io/dragonfly/v2/pkg/resolver" - "d7y.io/dragonfly/v2/pkg/rpc" managerclient "d7y.io/dragonfly/v2/pkg/rpc/manager/client" "d7y.io/dragonfly/v2/scheduler/config" "d7y.io/dragonfly/v2/scheduler/job" @@ -82,15 +77,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err s := &Server{config: cfg} // Initialize manager client. - var managerClientOptions []grpc.DialOption - if s.config.Options.Telemetry.Jaeger != "" { - managerClientOptions = append(managerClientOptions, - grpc.WithChainUnaryInterceptor(otelgrpc.UnaryClientInterceptor()), - grpc.WithChainStreamInterceptor(otelgrpc.StreamClientInterceptor()), - ) - } - - managerClient, err := managerclient.GetClient(cfg.Manager.Addr, managerClientOptions...) + managerClient, err := managerclient.GetClient(cfg.Manager.Addr) if err != nil { return nil, err } @@ -116,30 +103,11 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err } s.dynconfig = dynconfig - // register resolver and balancer - resolver.RegisterSeedPeer(dynconfig) - balancer.Register(pkgbalancer.NewConsistentHashingBuilder()) - // Initialize GC. s.gc = gc.New(gc.WithLogger(logger.GCLogger)) // Initialize resource. - var seedPeerDialOptions []grpc.DialOption - if s.config.Options.Telemetry.Jaeger != "" { - seedPeerDialOptions = append( - seedPeerDialOptions, - grpc.WithChainUnaryInterceptor( - otelgrpc.UnaryClientInterceptor(), - rpc.RefresherUnaryClientInterceptor(dynconfig), - ), - grpc.WithChainStreamInterceptor( - otelgrpc.StreamClientInterceptor(), - rpc.RefresherStreamClientInterceptor(dynconfig), - ), - ) - } - - resource, err := resource.New(cfg, s.gc, dynconfig, seedPeerDialOptions...) + resource, err := resource.New(cfg, s.gc, dynconfig) if err != nil { return nil, err } @@ -158,16 +126,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err service := service.New(cfg, resource, scheduler, dynconfig, s.storage) // Initialize grpc service. - var schedulerServerOptions []grpc.ServerOption - if s.config.Options.Telemetry.Jaeger != "" { - schedulerServerOptions = append( - schedulerServerOptions, - grpc.ChainUnaryInterceptor(otelgrpc.UnaryServerInterceptor()), - grpc.ChainStreamInterceptor(otelgrpc.StreamServerInterceptor()), - ) - } - - svr := rpcserver.New(service, schedulerServerOptions...) + svr := rpcserver.New(service) s.grpcServer = svr // Initialize job service.