grpc: Minor refactor in server code. (#3779)

This commit is contained in:
Easwar Swaminathan 2020-08-06 13:10:09 -07:00 committed by GitHub
parent b830b5f361
commit a5514c9e50
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 43 deletions

View File

@ -80,13 +80,14 @@ type ServiceDesc struct {
Metadata interface{} Metadata interface{}
} }
// service consists of the information of the server serving this service and // serviceInfo wraps information about a service. It is very similar to
// the methods in this service. // ServiceDesc and is constructed from it for internal purposes.
type service struct { type serviceInfo struct {
server interface{} // the server for service methods // Contains the implementation for the methods in this service.
md map[string]*MethodDesc serviceImpl interface{}
sd map[string]*StreamDesc methods map[string]*MethodDesc
mdata interface{} streams map[string]*StreamDesc
mdata interface{}
} }
type serverWorkerData struct { type serverWorkerData struct {
@ -99,14 +100,14 @@ type serverWorkerData struct {
type Server struct { type Server struct {
opts serverOptions opts serverOptions
mu sync.Mutex // guards following mu sync.Mutex // guards following
lis map[net.Listener]bool lis map[net.Listener]bool
conns map[transport.ServerTransport]bool conns map[transport.ServerTransport]bool
serve bool serve bool
drain bool drain bool
cv *sync.Cond // signaled when connections close for GracefulStop cv *sync.Cond // signaled when connections close for GracefulStop
m map[string]*service // service name -> service info services map[string]*serviceInfo // service name -> service info
events trace.EventLog events trace.EventLog
quit *grpcsync.Event quit *grpcsync.Event
done *grpcsync.Event done *grpcsync.Event
@ -497,13 +498,13 @@ func NewServer(opt ...ServerOption) *Server {
o.apply(&opts) o.apply(&opts)
} }
s := &Server{ s := &Server{
lis: make(map[net.Listener]bool), lis: make(map[net.Listener]bool),
opts: opts, opts: opts,
conns: make(map[transport.ServerTransport]bool), conns: make(map[transport.ServerTransport]bool),
m: make(map[string]*service), services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(), quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(), done: grpcsync.NewEvent(),
czData: new(channelzData), czData: new(channelzData),
} }
chainUnaryServerInterceptors(s) chainUnaryServerInterceptors(s)
chainStreamServerInterceptors(s) chainStreamServerInterceptors(s)
@ -558,24 +559,24 @@ func (s *Server) register(sd *ServiceDesc, ss interface{}) {
if s.serve { if s.serve {
logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName) logger.Fatalf("grpc: Server.RegisterService after Server.Serve for %q", sd.ServiceName)
} }
if _, ok := s.m[sd.ServiceName]; ok { if _, ok := s.services[sd.ServiceName]; ok {
logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName) logger.Fatalf("grpc: Server.RegisterService found duplicate service registration for %q", sd.ServiceName)
} }
srv := &service{ info := &serviceInfo{
server: ss, serviceImpl: ss,
md: make(map[string]*MethodDesc), methods: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc), streams: make(map[string]*StreamDesc),
mdata: sd.Metadata, mdata: sd.Metadata,
} }
for i := range sd.Methods { for i := range sd.Methods {
d := &sd.Methods[i] d := &sd.Methods[i]
srv.md[d.MethodName] = d info.methods[d.MethodName] = d
} }
for i := range sd.Streams { for i := range sd.Streams {
d := &sd.Streams[i] d := &sd.Streams[i]
srv.sd[d.StreamName] = d info.streams[d.StreamName] = d
} }
s.m[sd.ServiceName] = srv s.services[sd.ServiceName] = info
} }
// MethodInfo contains the information of an RPC including its method name and type. // MethodInfo contains the information of an RPC including its method name and type.
@ -599,16 +600,16 @@ type ServiceInfo struct {
// Service names include the package names, in the form of <package>.<service>. // Service names include the package names, in the form of <package>.<service>.
func (s *Server) GetServiceInfo() map[string]ServiceInfo { func (s *Server) GetServiceInfo() map[string]ServiceInfo {
ret := make(map[string]ServiceInfo) ret := make(map[string]ServiceInfo)
for n, srv := range s.m { for n, srv := range s.services {
methods := make([]MethodInfo, 0, len(srv.md)+len(srv.sd)) methods := make([]MethodInfo, 0, len(srv.methods)+len(srv.streams))
for m := range srv.md { for m := range srv.methods {
methods = append(methods, MethodInfo{ methods = append(methods, MethodInfo{
Name: m, Name: m,
IsClientStream: false, IsClientStream: false,
IsServerStream: false, IsServerStream: false,
}) })
} }
for m, d := range srv.sd { for m, d := range srv.streams {
methods = append(methods, MethodInfo{ methods = append(methods, MethodInfo{
Name: m, Name: m,
IsClientStream: d.ClientStreams, IsClientStream: d.ClientStreams,
@ -1020,7 +1021,7 @@ func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info
} }
} }
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
sh := s.opts.statsHandler sh := s.opts.statsHandler
if sh != nil || trInfo != nil || channelz.IsOn() { if sh != nil || trInfo != nil || channelz.IsOn() {
if channelz.IsOn() { if channelz.IsOn() {
@ -1177,7 +1178,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
return nil return nil
} }
ctx := NewContextWithServerTransportStream(stream.Context(), stream) ctx := NewContextWithServerTransportStream(stream.Context(), stream)
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt) reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
if appErr != nil { if appErr != nil {
appStatus, ok := status.FromError(appErr) appStatus, ok := status.FromError(appErr)
if !ok { if !ok {
@ -1303,7 +1304,7 @@ func getChainStreamHandler(interceptors []StreamServerInterceptor, curr int, inf
} }
} }
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, sd *StreamDesc, trInfo *traceInfo) (err error) {
if channelz.IsOn() { if channelz.IsOn() {
s.incrCallsStarted() s.incrCallsStarted()
} }
@ -1420,8 +1421,8 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
} }
var appErr error var appErr error
var server interface{} var server interface{}
if srv != nil { if info != nil {
server = srv.server server = info.serviceImpl
} }
if s.opts.streamInt == nil { if s.opts.streamInt == nil {
appErr = sd.Handler(server, ss) appErr = sd.Handler(server, ss)
@ -1497,13 +1498,13 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
service := sm[:pos] service := sm[:pos]
method := sm[pos+1:] method := sm[pos+1:]
srv, knownService := s.m[service] srv, knownService := s.services[service]
if knownService { if knownService {
if md, ok := srv.md[method]; ok { if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo) s.processUnaryRPC(t, stream, srv, md, trInfo)
return return
} }
if sd, ok := srv.sd[method]; ok { if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo) s.processStreamingRPC(t, stream, srv, sd, trInfo)
return return
} }