Expose underlying gRPC client and server objects (#311)

* Expose underlying gRPC client and server objects

Fixes #204

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Fixed reported race conditions

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

* Support for Go 1.17

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>

Signed-off-by: ItalyPaleAle <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
Alessandro (Ale) Segala 2022-09-30 15:56:05 -07:00 committed by GitHub
parent f182441168
commit cfd74830c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 41 additions and 15 deletions

View File

@ -300,7 +300,6 @@ type GRPCClient struct {
ctxCancelFunc context.CancelFunc
protoClient pb.DaprClient
authToken string
mux sync.Mutex
}
// Close cleans up all resources created by the client.
@ -308,15 +307,14 @@ func (c *GRPCClient) Close() {
c.ctxCancelFunc()
if c.connection != nil {
c.connection.Close()
c.connection = nil
}
}
// WithAuthToken sets Dapr API token on the instantiated client.
// Allows empty string to reset token on existing client.
func (c *GRPCClient) WithAuthToken(token string) {
c.mux.Lock()
c.authToken = token
c.mux.Unlock()
}
// WithTraceID adds existing trace ID to the outgoing context.
@ -349,3 +347,8 @@ func (c *GRPCClient) Shutdown(ctx context.Context) error {
func (c *GRPCClient) GrpcClient() pb.DaprClient {
return c.protoClient
}
// GrpcClientConn returns the grpc.ClientConn object used by this client.
func (c *GRPCClient) GrpcClientConn() *grpc.ClientConn {
return c.connection
}

View File

@ -16,6 +16,7 @@ package grpc
import (
"net"
"os"
"sync/atomic"
"github.com/pkg/errors"
"google.golang.org/grpc"
@ -48,13 +49,20 @@ func NewServiceWithListener(lis net.Listener) common.Service {
}
func newService(lis net.Listener) *Server {
return &Server{
s := &Server{
listener: lis,
invokeHandlers: make(map[string]common.ServiceInvocationHandler),
topicRegistrar: make(internal.TopicRegistrar),
bindingHandlers: make(map[string]common.BindingInvocationHandler),
authToken: os.Getenv(common.AppAPITokenEnvVar),
}
gs := grpc.NewServer()
pb.RegisterAppCallbackServer(gs, s)
pb.RegisterAppCallbackHealthCheckServer(gs, s)
s.grpcServer = gs
return s
}
// Server is the gRPC service implementation for Dapr.
@ -68,6 +76,7 @@ type Server struct {
healthCheckHandler common.HealthCheckHandler
authToken string
grpcServer *grpc.Server
started uint32
}
func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option) {
@ -76,19 +85,33 @@ func (s *Server) RegisterActorImplFactory(f actor.Factory, opts ...config.Option
// Start registers the server and starts it.
func (s *Server) Start() error {
gs := grpc.NewServer()
pb.RegisterAppCallbackServer(gs, s)
pb.RegisterAppCallbackHealthCheckServer(gs, s)
s.grpcServer = gs
return gs.Serve(s.listener)
if !atomic.CompareAndSwapUint32(&s.started, 0, 1) {
return errors.New("a gRPC server can only be started once")
}
return s.grpcServer.Serve(s.listener)
}
// Stop stops the previously started service.
// Stop stops the previously-started service.
func (s *Server) Stop() error {
return s.listener.Close()
}
func (s *Server) GracefulStop() error {
s.grpcServer.GracefulStop()
if atomic.LoadUint32(&s.started) == 0 {
return nil
}
s.grpcServer.Stop()
s.grpcServer = nil
return nil
}
// GrecefulStop stops the previously-started service gracefully.
func (s *Server) GracefulStop() error {
if atomic.LoadUint32(&s.started) == 0 {
return nil
}
s.grpcServer.GracefulStop()
s.grpcServer = nil
return nil
}
// GrpcServer returns the grpc.Server object managed by the server.
func (s *Server) GrpcServer() *grpc.Server {
return s.grpcServer
}