diff --git a/client/daemon/daemon.go b/client/daemon/daemon.go index f44fc808d..34da647c0 100644 --- a/client/daemon/daemon.go +++ b/client/daemon/daemon.go @@ -310,6 +310,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) { if err != nil { return nil, err } + // register notify for health check + dynconfig.Register(rpcManager) proxyManager, err := proxy.NewProxyManager(host, peerTaskManager, opt.Proxy) if err != nil { @@ -722,6 +724,7 @@ func (cd *clientDaemon) Serve() error { return nil }) + // when there is no manager configured, watch schedulers in local config if cd.managerClient == nil { watchers = append(watchers, cd.dynconfig.OnNotify) } diff --git a/client/daemon/peer/peertask_manager_test.go b/client/daemon/peer/peertask_manager_test.go index 6c7b47a3a..54cfc5a05 100644 --- a/client/daemon/peer/peertask_manager_test.go +++ b/client/daemon/peer/peertask_manager_test.go @@ -41,6 +41,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -162,7 +163,8 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio }) go func() { - if err := daemonserver.New(daemon).Serve(ln); err != nil { + hs := health.NewServer() + if err := daemonserver.New(daemon, hs).Serve(ln); err != nil { panic(err) } }() diff --git a/client/daemon/peer/peertask_stream_backsource_partial_test.go b/client/daemon/peer/peertask_stream_backsource_partial_test.go index c069728ae..dc886375b 100644 --- a/client/daemon/peer/peertask_stream_backsource_partial_test.go +++ b/client/daemon/peer/peertask_stream_backsource_partial_test.go @@ -36,6 +36,7 @@ import ( "go.uber.org/atomic" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health" commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" @@ -142,7 +143,8 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte, Addr: fmt.Sprintf("0.0.0.0:%d", port), }) go func(daemon *dfdaemonv1mocks.MockDaemonServer, ln net.Listener) { - if err := daemonserver.New(daemon).Serve(ln); err != nil { + hs := health.NewServer() + if err := daemonserver.New(daemon, hs).Serve(ln); err != nil { log.Fatal(err) } }(daemon, ln) diff --git a/client/daemon/peer/traffic_shaper_test.go b/client/daemon/peer/traffic_shaper_test.go index acd3b01ee..0e8d62365 100644 --- a/client/daemon/peer/traffic_shaper_test.go +++ b/client/daemon/peer/traffic_shaper_test.go @@ -36,6 +36,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health" "google.golang.org/grpc/status" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -163,7 +164,8 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr }) go func() { - if err := daemonserver.New(daemon).Serve(ln); err != nil { + hs := health.NewServer() + if err := daemonserver.New(daemon, hs).Serve(ln); err != nil { panic(err) } }() diff --git a/client/daemon/rpcserver/rpcserver.go b/client/daemon/rpcserver/rpcserver.go index b852ab09b..48e3a20a7 100644 --- a/client/daemon/rpcserver/rpcserver.go +++ b/client/daemon/rpcserver/rpcserver.go @@ -43,6 +43,8 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/health" + healthpb "google.golang.org/grpc/health/grpc_health_v1" grpcpeer "google.golang.org/grpc/peer" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -71,6 +73,7 @@ type Server interface { util.KeepAlive ServeDownload(listener net.Listener) error ServePeer(listener net.Listener) error + OnNotify(*config.DynconfigData) Stop() } @@ -80,6 +83,7 @@ type server struct { peerTaskManager peer.TaskManager storageManager storage.Manager + healthServer *health.Server downloadServer *grpc.Server peerServer *grpc.Server uploadAddr string @@ -105,14 +109,19 @@ func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager, recursiveConcurrent: recursiveConcurrent, cacheRecursiveMetadata: cacheRecursiveMetadata, + + healthServer: health.NewServer(), } sd := &seeder{ server: s, } - s.downloadServer = dfdaemonserver.New(s, downloadOpts...) - s.peerServer = dfdaemonserver.New(s, peerOpts...) + // set not serving by default + s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) + + s.downloadServer = dfdaemonserver.New(s, s.healthServer, downloadOpts...) + s.peerServer = dfdaemonserver.New(s, s.healthServer, peerOpts...) cdnsystemv1.RegisterSeederServer(s.peerServer, sd) return s, nil } @@ -126,6 +135,14 @@ func (s *server) ServePeer(listener net.Listener) error { return s.peerServer.Serve(listener) } +func (s *server) OnNotify(data *config.DynconfigData) { + if len(data.Schedulers) > 0 { + s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING) + } else { + s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING) + } +} + func (s *server) Stop() { s.peerServer.GracefulStop() s.downloadServer.GracefulStop() diff --git a/client/daemon/rpcserver/rpcserver_test.go b/client/daemon/rpcserver/rpcserver_test.go index 32f0c0003..a0df64727 100644 --- a/client/daemon/rpcserver/rpcserver_test.go +++ b/client/daemon/rpcserver/rpcserver_test.go @@ -29,6 +29,7 @@ import ( "github.com/phayes/freeport" testifyassert "github.com/stretchr/testify/assert" "google.golang.org/grpc" + "google.golang.org/grpc/health" commonv1 "d7y.io/api/pkg/apis/common/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" @@ -673,13 +674,12 @@ func TestServer_ServeDownload(t *testing.T) { }() return ch, false, nil }) - m := &server{ + s := &server{ KeepAlive: util.NewKeepAlive("test"), peerHost: &schedulerv1.PeerHost{}, peerTaskManager: mockPeerTaskManager, } - m.downloadServer = dfdaemonserver.New(m) - client := setupPeerServerAndClient(t, m, assert, m.ServeDownload) + client := setupPeerServerAndClient(t, s, assert, s.ServeDownload) request := &dfdaemonv1.DownRequest{ Uuid: uuid.Generate().String(), Url: "http://localhost/test", @@ -745,7 +745,6 @@ func TestServer_ServePeer(t *testing.T) { peerHost: &schedulerv1.PeerHost{}, storageManager: mockStorageManger, } - s.peerServer = dfdaemonserver.New(s) client := setupPeerServerAndClient(t, s, assert, s.ServePeer) defer s.peerServer.GracefulStop() @@ -1143,7 +1142,11 @@ func TestServer_SyncPieceTasks(t *testing.T) { } func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.V1 { - srv.peerServer = dfdaemonserver.New(srv) + if srv.healthServer == nil { + srv.healthServer = health.NewServer() + } + srv.downloadServer = dfdaemonserver.New(srv, srv.healthServer) + srv.peerServer = dfdaemonserver.New(srv, srv.healthServer) port, err := freeport.GetFreePort() if err != nil { t.Fatal(err) diff --git a/client/daemon/rpcserver/seeder_test.go b/client/daemon/rpcserver/seeder_test.go index ab03ee8f2..cc26db97c 100644 --- a/client/daemon/rpcserver/seeder_test.go +++ b/client/daemon/rpcserver/seeder_test.go @@ -31,6 +31,7 @@ import ( "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/health" cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" commonv1 "d7y.io/api/pkg/apis/common/v1" @@ -365,7 +366,11 @@ func Test_ObtainSeeds(t *testing.T) { } func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) (int, client.Client) { - srv.peerServer = dfdaemonserver.New(srv) + if srv.healthServer == nil { + srv.healthServer = health.NewServer() + } + + srv.peerServer = dfdaemonserver.New(srv, srv.healthServer) cdnsystemv1.RegisterSeederServer(srv.peerServer, sd) port, err := freeport.GetFreePort() diff --git a/pkg/resolver/scheduler_resolver.go b/pkg/resolver/scheduler_resolver.go index db3e511ff..de4f92cb8 100644 --- a/pkg/resolver/scheduler_resolver.go +++ b/pkg/resolver/scheduler_resolver.go @@ -44,7 +44,7 @@ type SchedulerResolver struct { dynconfig config.Dynconfig } -// SchedulerRegister register the dragonfly resovler builder to the grpc with custom schema. +// RegisterScheduler registers the dragonfly resolver builder to the grpc with custom schema. func RegisterScheduler(dynconfig config.Dynconfig) { resolver.Register(&SchedulerResolver{dynconfig: dynconfig}) } diff --git a/pkg/rpc/dfdaemon/server/server.go b/pkg/rpc/dfdaemon/server/server.go index aff72ab45..56886bba6 100644 --- a/pkg/rpc/dfdaemon/server/server.go +++ b/pkg/rpc/dfdaemon/server/server.go @@ -27,7 +27,6 @@ import ( grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" - "google.golang.org/grpc/health" healthpb "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/reflection" @@ -55,8 +54,12 @@ const ( DefaultMaxConnectionAgeGrace = 5 * time.Minute ) +type DaemonServer interface { + dfdaemonv1.DaemonServer +} + // New returns a grpc server instance and register service on grpc server. -func New(svr dfdaemonv1.DaemonServer, opts ...grpc.ServerOption) *grpc.Server { +func New(svr dfdaemonv1.DaemonServer, healthServer healthpb.HealthServer, opts ...grpc.ServerOption) *grpc.Server { limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) grpcServer := grpc.NewServer(append([]grpc.ServerOption{ @@ -89,7 +92,7 @@ func New(svr dfdaemonv1.DaemonServer, opts ...grpc.ServerOption) *grpc.Server { dfdaemonv1.RegisterDaemonServer(grpcServer, svr) // Register health on grpc server. - healthpb.RegisterHealthServer(grpcServer, health.NewServer()) + healthpb.RegisterHealthServer(grpcServer, healthServer) // Register reflection on grpc server. reflection.Register(grpcServer)