From cfd74830c9246d21f705019272b6defe979ecca0 Mon Sep 17 00:00:00 2001 From: "Alessandro (Ale) Segala" <43508+ItalyPaleAle@users.noreply.github.com> Date: Fri, 30 Sep 2022 15:56:05 -0700 Subject: [PATCH] Expose underlying gRPC client and server objects (#311) * Expose underlying gRPC client and server objects Fixes #204 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Fixed reported race conditions Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> * Support for Go 1.17 Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com> --- client/client.go | 9 +++++--- service/grpc/service.go | 47 ++++++++++++++++++++++++++++++----------- 2 files changed, 41 insertions(+), 15 deletions(-) diff --git a/client/client.go b/client/client.go index 6de53cb..0b5b23d 100644 --- a/client/client.go +++ b/client/client.go @@ -300,7 +300,6 @@ type GRPCClient struct { ctxCancelFunc context.CancelFunc protoClient pb.DaprClient authToken string - mux sync.Mutex } // Close cleans up all resources created by the client. @@ -308,15 +307,14 @@ func (c *GRPCClient) Close() { c.ctxCancelFunc() if c.connection != nil { c.connection.Close() + c.connection = nil } } // WithAuthToken sets Dapr API token on the instantiated client. // Allows empty string to reset token on existing client. func (c *GRPCClient) WithAuthToken(token string) { - c.mux.Lock() c.authToken = token - c.mux.Unlock() } // WithTraceID adds existing trace ID to the outgoing context. @@ -349,3 +347,8 @@ func (c *GRPCClient) Shutdown(ctx context.Context) error { func (c *GRPCClient) GrpcClient() pb.DaprClient { return c.protoClient } + +// GrpcClientConn returns the grpc.ClientConn object used by this client. +func (c *GRPCClient) GrpcClientConn() *grpc.ClientConn { + return c.connection +} diff --git a/service/grpc/service.go b/service/grpc/service.go index f879eb9..8b101f4 100644 --- a/service/grpc/service.go +++ b/service/grpc/service.go @@ -16,6 +16,7 @@ package grpc import ( "net" "os" + "sync/atomic" "github.com/pkg/errors" "google.golang.org/grpc" @@ -48,13 +49,20 @@ func NewServiceWithListener(lis net.Listener) common.Service { } func newService(lis net.Listener) *Server { - return &Server{ + s := &Server{ listener: lis, invokeHandlers: make(map[string]common.ServiceInvocationHandler), topicRegistrar: make(internal.TopicRegistrar), bindingHandlers: make(map[string]common.BindingInvocationHandler), authToken: os.Getenv(common.AppAPITokenEnvVar), } + + gs := grpc.NewServer() + pb.RegisterAppCallbackServer(gs, s) + pb.RegisterAppCallbackHealthCheckServer(gs, s) + s.grpcServer = gs + + return s } // Server is the gRPC service implementation for Dapr. @@ -68,6 +76,7 @@ type Server struct { healthCheckHandler common.HealthCheckHandler authToken string grpcServer *grpc.Server + started uint32 } func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) { @@ -76,19 +85,33 @@ func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option // Start registers the server and starts it. func (s *Server) Start() error { - gs := grpc.NewServer() - pb.RegisterAppCallbackServer(gs, s) - pb.RegisterAppCallbackHealthCheckServer(gs, s) - s.grpcServer = gs - return gs.Serve(s.listener) + if !atomic.CompareAndSwapUint32(&s.started, 0, 1) { + return errors.New("a gRPC server can only be started once") + } + return s.grpcServer.Serve(s.listener) } -// Stop stops the previously started service. +// Stop stops the previously-started service. func (s *Server) Stop() error { - return s.listener.Close() -} - -func (s *Server) GracefulStop() error { - s.grpcServer.GracefulStop() + if atomic.LoadUint32(&s.started) == 0 { + return nil + } + s.grpcServer.Stop() + s.grpcServer = nil return nil } + +// GrecefulStop stops the previously-started service gracefully. +func (s *Server) GracefulStop() error { + if atomic.LoadUint32(&s.started) == 0 { + return nil + } + s.grpcServer.GracefulStop() + s.grpcServer = nil + return nil +} + +// GrpcServer returns the grpc.Server object managed by the server. +func (s *Server) GrpcServer() *grpc.Server { + return s.grpcServer +}