stubserver: support xds-enabled grpc server (#7613)

This commit is contained in:
Easwar Swaminathan 2024-09-12 15:00:20 -07:00 committed by GitHub
parent b6fde8cdd1
commit cf5d5411d5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 29 additions and 9 deletions

View File

@ -138,7 +138,7 @@ func startServer(t *testing.T, r reportType) *testServer {
MinReportingInterval: 10 * time.Millisecond,
}
internal.ORCAAllowAnyMinReportingInterval.(func(so *orca.ServiceOptions))(&oso)
sopts = append(sopts, stubserver.RegisterServiceServerOption(func(s *grpc.Server) {
sopts = append(sopts, stubserver.RegisterServiceServerOption(func(s grpc.ServiceRegistrar) {
if err := orca.Register(s, oso); err != nil {
t.Fatalf("Failed to register orca service: %v", err)
}

View File

@ -24,6 +24,7 @@ import (
"context"
"fmt"
"net"
"net/http"
"testing"
"time"
@ -39,6 +40,15 @@ import (
testpb "google.golang.org/grpc/interop/grpc_testing"
)
// GRPCServer is an interface that groups methods implemented by a grpc.Server
// or an xds.GRPCServer that are used by the StubServer.
type GRPCServer interface {
grpc.ServiceRegistrar
Stop()
GracefulStop()
Serve(net.Listener) error
}
// StubServer is a server that is easy to customize within individual test
// cases.
type StubServer struct {
@ -53,7 +63,12 @@ type StubServer struct {
// A client connected to this service the test may use. Created in Start().
Client testgrpc.TestServiceClient
CC *grpc.ClientConn
S *grpc.Server
// Server to serve this service from.
//
// If nil, a new grpc.Server is created, listening on the provided Network
// and Address fields, or listening using the provided Listener.
S GRPCServer
// Parameters for Listen and Dial. Defaults will be used if these are empty
// before Start.
@ -100,14 +115,14 @@ func (ss *StubServer) Start(sopts []grpc.ServerOption, dopts ...grpc.DialOption)
type registerServiceServerOption struct {
grpc.EmptyServerOption
f func(*grpc.Server)
f func(grpc.ServiceRegistrar)
}
// RegisterServiceServerOption returns a ServerOption that will run f() in
// Start or StartServer with the grpc.Server created before serving. This
// allows other services to be registered on the test server (e.g. ORCA,
// health, or reflection).
func RegisterServiceServerOption(f func(*grpc.Server)) grpc.ServerOption {
func RegisterServiceServerOption(f func(grpc.ServiceRegistrar)) grpc.ServerOption {
return &registerServiceServerOption{f: f}
}
@ -132,7 +147,9 @@ func (ss *StubServer) setupServer(sopts ...grpc.ServerOption) (net.Listener, err
}
ss.Address = lis.Addr().String()
ss.S = grpc.NewServer(sopts...)
if ss.S == nil {
ss.S = grpc.NewServer(sopts...)
}
for _, so := range sopts {
switch x := so.(type) {
case *registerServiceServerOption:
@ -154,9 +171,14 @@ func (ss *StubServer) StartHandlerServer(sopts ...grpc.ServerOption) error {
return err
}
handler, ok := ss.S.(interface{ http.Handler })
if !ok {
panic(fmt.Sprintf("server of type %T does not implement http.Handler", ss.S))
}
go func() {
hs := &http2.Server{}
opts := &http2.ServeConnOpts{Handler: ss.S}
opts := &http2.ServeConnOpts{Handler: handler}
for {
conn, err := lis.Accept()
if err != nil {

View File

@ -111,9 +111,7 @@ func NewService(opts ServiceOptions) (*Service, error) {
// Register creates a new ORCA service implementation configured using the
// provided options and registers the same on the provided grpc Server.
func Register(s *grpc.Server, opts ServiceOptions) error {
// TODO(https://github.com/cncf/xds/issues/41): replace *grpc.Server with
// grpc.ServiceRegistrar when possible.
func Register(s grpc.ServiceRegistrar, opts ServiceOptions) error {
service, err := NewService(opts)
if err != nil {
return err