feat: enhance daemon health check (#2130)

Signed-off-by: Jim Ma <majinjing3@gmail.com>
This commit is contained in:
Jim Ma 2023-03-02 12:13:42 +08:00 committed by Gaius
parent c672e75ed8
commit f5aff5e27f
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
9 changed files with 52 additions and 15 deletions

View File

@ -310,6 +310,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
// register notify for health check
dynconfig.Register(rpcManager)
proxyManager, err := proxy.NewProxyManager(host, peerTaskManager, opt.Proxy) proxyManager, err := proxy.NewProxyManager(host, peerTaskManager, opt.Proxy)
if err != nil { if err != nil {
@ -722,6 +724,7 @@ func (cd *clientDaemon) Serve() error {
return nil return nil
}) })
// when there is no manager configured, watch schedulers in local config
if cd.managerClient == nil { if cd.managerClient == nil {
watchers = append(watchers, cd.dynconfig.OnNotify) watchers = append(watchers, cd.dynconfig.OnNotify)
} }

View File

@ -41,6 +41,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
@ -162,7 +163,8 @@ func setupPeerTaskManagerComponents(ctrl *gomock.Controller, opt componentsOptio
}) })
go func() { go func() {
if err := daemonserver.New(daemon).Serve(ln); err != nil { hs := health.NewServer()
if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
panic(err) panic(err)
} }
}() }()

View File

@ -36,6 +36,7 @@ import (
"go.uber.org/atomic" "go.uber.org/atomic"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1"
@ -142,7 +143,8 @@ func setupBackSourcePartialComponents(ctrl *gomock.Controller, testBytes []byte,
Addr: fmt.Sprintf("0.0.0.0:%d", port), Addr: fmt.Sprintf("0.0.0.0:%d", port),
}) })
go func(daemon *dfdaemonv1mocks.MockDaemonServer, ln net.Listener) { go func(daemon *dfdaemonv1mocks.MockDaemonServer, ln net.Listener) {
if err := daemonserver.New(daemon).Serve(ln); err != nil { hs := health.NewServer()
if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
log.Fatal(err) log.Fatal(err)
} }
}(daemon, ln) }(daemon, ln)

View File

@ -36,6 +36,7 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
@ -163,7 +164,8 @@ func trafficShaperSetupPeerTaskManagerComponents(ctrl *gomock.Controller, opt tr
}) })
go func() { go func() {
if err := daemonserver.New(daemon).Serve(ln); err != nil { hs := health.NewServer()
if err := daemonserver.New(daemon, hs).Serve(ln); err != nil {
panic(err) panic(err)
} }
}() }()

View File

@ -43,6 +43,8 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
grpcpeer "google.golang.org/grpc/peer" grpcpeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status" "google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb" "google.golang.org/protobuf/types/known/emptypb"
@ -71,6 +73,7 @@ type Server interface {
util.KeepAlive util.KeepAlive
ServeDownload(listener net.Listener) error ServeDownload(listener net.Listener) error
ServePeer(listener net.Listener) error ServePeer(listener net.Listener) error
OnNotify(*config.DynconfigData)
Stop() Stop()
} }
@ -80,6 +83,7 @@ type server struct {
peerTaskManager peer.TaskManager peerTaskManager peer.TaskManager
storageManager storage.Manager storageManager storage.Manager
healthServer *health.Server
downloadServer *grpc.Server downloadServer *grpc.Server
peerServer *grpc.Server peerServer *grpc.Server
uploadAddr string uploadAddr string
@ -105,14 +109,19 @@ func New(peerHost *schedulerv1.PeerHost, peerTaskManager peer.TaskManager,
recursiveConcurrent: recursiveConcurrent, recursiveConcurrent: recursiveConcurrent,
cacheRecursiveMetadata: cacheRecursiveMetadata, cacheRecursiveMetadata: cacheRecursiveMetadata,
healthServer: health.NewServer(),
} }
sd := &seeder{ sd := &seeder{
server: s, server: s,
} }
s.downloadServer = dfdaemonserver.New(s, downloadOpts...) // set not serving by default
s.peerServer = dfdaemonserver.New(s, peerOpts...) s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
s.downloadServer = dfdaemonserver.New(s, s.healthServer, downloadOpts...)
s.peerServer = dfdaemonserver.New(s, s.healthServer, peerOpts...)
cdnsystemv1.RegisterSeederServer(s.peerServer, sd) cdnsystemv1.RegisterSeederServer(s.peerServer, sd)
return s, nil return s, nil
} }
@ -126,6 +135,14 @@ func (s *server) ServePeer(listener net.Listener) error {
return s.peerServer.Serve(listener) return s.peerServer.Serve(listener)
} }
func (s *server) OnNotify(data *config.DynconfigData) {
if len(data.Schedulers) > 0 {
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_SERVING)
} else {
s.healthServer.SetServingStatus("", healthpb.HealthCheckResponse_NOT_SERVING)
}
}
func (s *server) Stop() { func (s *server) Stop() {
s.peerServer.GracefulStop() s.peerServer.GracefulStop()
s.downloadServer.GracefulStop() s.downloadServer.GracefulStop()

View File

@ -29,6 +29,7 @@ import (
"github.com/phayes/freeport" "github.com/phayes/freeport"
testifyassert "github.com/stretchr/testify/assert" testifyassert "github.com/stretchr/testify/assert"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/health"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1" dfdaemonv1 "d7y.io/api/pkg/apis/dfdaemon/v1"
@ -673,13 +674,12 @@ func TestServer_ServeDownload(t *testing.T) {
}() }()
return ch, false, nil return ch, false, nil
}) })
m := &server{ s := &server{
KeepAlive: util.NewKeepAlive("test"), KeepAlive: util.NewKeepAlive("test"),
peerHost: &schedulerv1.PeerHost{}, peerHost: &schedulerv1.PeerHost{},
peerTaskManager: mockPeerTaskManager, peerTaskManager: mockPeerTaskManager,
} }
m.downloadServer = dfdaemonserver.New(m) client := setupPeerServerAndClient(t, s, assert, s.ServeDownload)
client := setupPeerServerAndClient(t, m, assert, m.ServeDownload)
request := &dfdaemonv1.DownRequest{ request := &dfdaemonv1.DownRequest{
Uuid: uuid.Generate().String(), Uuid: uuid.Generate().String(),
Url: "http://localhost/test", Url: "http://localhost/test",
@ -745,7 +745,6 @@ func TestServer_ServePeer(t *testing.T) {
peerHost: &schedulerv1.PeerHost{}, peerHost: &schedulerv1.PeerHost{},
storageManager: mockStorageManger, storageManager: mockStorageManger,
} }
s.peerServer = dfdaemonserver.New(s)
client := setupPeerServerAndClient(t, s, assert, s.ServePeer) client := setupPeerServerAndClient(t, s, assert, s.ServePeer)
defer s.peerServer.GracefulStop() defer s.peerServer.GracefulStop()
@ -1143,7 +1142,11 @@ func TestServer_SyncPieceTasks(t *testing.T) {
} }
func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.V1 { func setupPeerServerAndClient(t *testing.T, srv *server, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) dfdaemonclient.V1 {
srv.peerServer = dfdaemonserver.New(srv) if srv.healthServer == nil {
srv.healthServer = health.NewServer()
}
srv.downloadServer = dfdaemonserver.New(srv, srv.healthServer)
srv.peerServer = dfdaemonserver.New(srv, srv.healthServer)
port, err := freeport.GetFreePort() port, err := freeport.GetFreePort()
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

View File

@ -31,6 +31,7 @@ import (
"go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/health"
cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1" cdnsystemv1 "d7y.io/api/pkg/apis/cdnsystem/v1"
commonv1 "d7y.io/api/pkg/apis/common/v1" commonv1 "d7y.io/api/pkg/apis/common/v1"
@ -365,7 +366,11 @@ func Test_ObtainSeeds(t *testing.T) {
} }
func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) (int, client.Client) { func setupSeederServerAndClient(t *testing.T, srv *server, sd *seeder, assert *testifyassert.Assertions, serveFunc func(listener net.Listener) error) (int, client.Client) {
srv.peerServer = dfdaemonserver.New(srv) if srv.healthServer == nil {
srv.healthServer = health.NewServer()
}
srv.peerServer = dfdaemonserver.New(srv, srv.healthServer)
cdnsystemv1.RegisterSeederServer(srv.peerServer, sd) cdnsystemv1.RegisterSeederServer(srv.peerServer, sd)
port, err := freeport.GetFreePort() port, err := freeport.GetFreePort()

View File

@ -44,7 +44,7 @@ type SchedulerResolver struct {
dynconfig config.Dynconfig dynconfig config.Dynconfig
} }
// SchedulerRegister register the dragonfly resovler builder to the grpc with custom schema. // RegisterScheduler registers the dragonfly resolver builder to the grpc with custom schema.
func RegisterScheduler(dynconfig config.Dynconfig) { func RegisterScheduler(dynconfig config.Dynconfig) {
resolver.Register(&SchedulerResolver{dynconfig: dynconfig}) resolver.Register(&SchedulerResolver{dynconfig: dynconfig})
} }

View File

@ -27,7 +27,6 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus" grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1" healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
@ -55,8 +54,12 @@ const (
DefaultMaxConnectionAgeGrace = 5 * time.Minute DefaultMaxConnectionAgeGrace = 5 * time.Minute
) )
type DaemonServer interface {
dfdaemonv1.DaemonServer
}
// New returns a grpc server instance and register service on grpc server. // New returns a grpc server instance and register service on grpc server.
func New(svr dfdaemonv1.DaemonServer, opts ...grpc.ServerOption) *grpc.Server { func New(svr dfdaemonv1.DaemonServer, healthServer healthpb.HealthServer, opts ...grpc.ServerOption) *grpc.Server {
limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst) limiter := rpc.NewRateLimiterInterceptor(DefaultQPS, DefaultBurst)
grpcServer := grpc.NewServer(append([]grpc.ServerOption{ grpcServer := grpc.NewServer(append([]grpc.ServerOption{
@ -89,7 +92,7 @@ func New(svr dfdaemonv1.DaemonServer, opts ...grpc.ServerOption) *grpc.Server {
dfdaemonv1.RegisterDaemonServer(grpcServer, svr) dfdaemonv1.RegisterDaemonServer(grpcServer, svr)
// Register health on grpc server. // Register health on grpc server.
healthpb.RegisterHealthServer(grpcServer, health.NewServer()) healthpb.RegisterHealthServer(grpcServer, healthServer)
// Register reflection on grpc server. // Register reflection on grpc server.
reflection.Register(grpcServer) reflection.Register(grpcServer)