feat: add grpc health interface (#1195)

* feat: add grpc health interface

Signed-off-by: Gaius <gaius.qi@gmail.com>

* feat: add dfdaemon upload server healthy interface

Signed-off-by: Gaius <gaius.qi@gmail.com>
This commit is contained in:
Gaius 2022-03-25 17:45:04 +08:00
parent d45bffea40
commit 2bcdba6401
No known key found for this signature in database
GPG Key ID: 8B4E5D1290FA2FFB
6 changed files with 30 additions and 1 deletions

View File

@ -26,6 +26,8 @@ import (
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"d7y.io/dragonfly/v2/client/clientutil"
@ -66,8 +68,12 @@ func New(peerHost *scheduler.PeerHost, peerTaskManager peer.TaskManager, storage
peerTaskManager: peerTaskManager,
storageManager: storageManager,
}
svr.downloadServer = dfdaemonserver.New(svr, downloadOpts...)
healthpb.RegisterHealthServer(svr.downloadServer, health.NewServer())
svr.peerServer = dfdaemonserver.New(svr, peerOpts...)
healthpb.RegisterHealthServer(svr.peerServer, health.NewServer())
return svr, nil
}

View File

@ -73,6 +73,10 @@ func WithLimiter(limiter *rate.Limiter) func(*uploadManager) {
func (um *uploadManager) initRouter() {
r := mux.NewRouter()
// Health Check
r.HandleFunc("/healthy", um.handleHealth).Methods("GET")
// Peer download task
r.HandleFunc(PeerDownloadHTTPPathPrefix+"{taskPrefix:.*}/"+"{task:.*}", um.handleUpload).Queries("peerId", "{.*}").Methods("GET")
um.Server.Handler = r
}
@ -85,6 +89,11 @@ func (um *uploadManager) Stop() error {
return um.Server.Shutdown(context.Background())
}
// handleHealth uses to check server health.
func (um *uploadManager) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
}
// handleUpload uses to upload a task file when other peers download from it.
func (um *uploadManager) handleUpload(w http.ResponseWriter, r *http.Request) {
var (

View File

@ -218,7 +218,7 @@ func Init(cfg *config.Config, logDir string, service service.Service, enforcer *
pv1.GET(":id", h.GetV1Preheat)
// Health Check
r.GET("/healthy/*action", h.GetHealth)
r.GET("/healthy", h.GetHealth)
// Swagger
apiSeagger := ginSwagger.URL("/swagger/doc.json")

View File

@ -30,6 +30,8 @@ import (
grpc_prometheus "github.com/grpc-ecosystem/go-grpc-prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"
"gorm.io/gorm"
@ -76,7 +78,9 @@ func New(database *database.Database, cache *cache.Cache, searcher searcher.Sear
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(defaultUnaryMiddleWares...)),
}, opts...)...)
// Register servers on grpc server
manager.RegisterManagerServer(grpcServer, server)
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
return grpcServer
}

View File

@ -23,6 +23,8 @@ import (
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"d7y.io/dragonfly/v2/cdn/metrics"
"d7y.io/dragonfly/v2/internal/dferrors"
@ -52,7 +54,10 @@ type proxy struct {
func New(seederServer SeederServer, opts ...grpc.ServerOption) *grpc.Server {
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)
// Register servers on grpc server
cdnsystem.RegisterSeederServer(grpcServer, &proxy{server: seederServer})
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
return grpcServer
}

View File

@ -20,6 +20,8 @@ import (
"context"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
empty "google.golang.org/protobuf/types/known/emptypb"
"d7y.io/dragonfly/v2/pkg/rpc"
@ -42,7 +44,10 @@ type Server struct {
func New(service *service.Service, opts ...grpc.ServerOption) *grpc.Server {
svr := &Server{service: service}
grpcServer := grpc.NewServer(append(rpc.DefaultServerOptions, opts...)...)
// Register servers on grpc server
scheduler.RegisterSchedulerServer(grpcServer, svr)
healthpb.RegisterHealthServer(grpcServer, health.NewServer())
return grpcServer
}