xds: serving mode changes outlined in gRFC A36 (#4328)

This commit is contained in:
Easwar Swaminathan 2021-04-26 14:29:06 -07:00 committed by GitHub
parent 9572fd6fae
commit 52a707c0da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 639 additions and 62 deletions

View File

@ -59,6 +59,11 @@ var (
// gRPC server. An xDS-enabled server needs to know what type of credentials
// is configured on the underlying gRPC server. This is set by server.go.
GetServerCredentials interface{} // func (*grpc.Server) credentials.TransportCredentials
// DrainServerTransports initiates a graceful close of existing connections
// on a gRPC server accepted on the provided listener address. An
// xDS-enabled server invokes this method on a grpc.Server when a particular
// listener moves to "not-serving" mode.
DrainServerTransports interface{} // func(*grpc.Server, string)
)
// HealthChecker defines the signature of the client-side LB channel health checking function.

View File

@ -57,12 +57,22 @@ import (
const (
defaultServerMaxReceiveMessageSize = 1024 * 1024 * 4
defaultServerMaxSendMessageSize = math.MaxInt32
// Server transports are tracked in a map which is keyed on listener
// address. For regular gRPC traffic, connections are accepted in Serve()
// through a call to Accept(), and we use the actual listener address as key
// when we add it to the map. But for connections received through
// ServeHTTP(), we do not have a listener and hence use this dummy value.
listenerAddressForServeHTTP = "listenerAddressForServeHTTP"
)
func init() {
internal.GetServerCredentials = func(srv *Server) credentials.TransportCredentials {
return srv.opts.creds
}
internal.DrainServerTransports = func(srv *Server, addr string) {
srv.drainServerTransports(addr)
}
}
var statusOK = status.New(codes.OK, "")
@ -107,9 +117,12 @@ type serverWorkerData struct {
type Server struct {
opts serverOptions
mu sync.Mutex // guards following
lis map[net.Listener]bool
conns map[transport.ServerTransport]bool
mu sync.Mutex // guards following
lis map[net.Listener]bool
// conns contains all active server transports. It is a map keyed on a
// listener address with the value being the set of active transports
// belonging to that listener.
conns map[string]map[transport.ServerTransport]bool
serve bool
drain bool
cv *sync.Cond // signaled when connections close for GracefulStop
@ -519,7 +532,7 @@ func NewServer(opt ...ServerOption) *Server {
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[transport.ServerTransport]bool),
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
@ -778,7 +791,7 @@ func (s *Server) Serve(lis net.Listener) error {
// s.conns before this conn can be added.
s.serveWG.Add(1)
go func() {
s.handleRawConn(rawConn)
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
@ -786,7 +799,7 @@ func (s *Server) Serve(lis net.Listener) error {
// handleRawConn forks a goroutine to handle a just-accepted connection that
// has not had any I/O performed on it yet.
func (s *Server) handleRawConn(rawConn net.Conn) {
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
if s.quit.HasFired() {
rawConn.Close()
return
@ -814,15 +827,24 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
}
rawConn.SetDeadline(time.Time{})
if !s.addConn(st) {
if !s.addConn(lisAddr, st) {
return
}
go func() {
s.serveStreams(st)
s.removeConn(st)
s.removeConn(lisAddr, st)
}()
}
func (s *Server) drainServerTransports(addr string) {
s.mu.Lock()
conns := s.conns[addr]
for st := range conns {
st.Drain()
}
s.mu.Unlock()
}
// newHTTP2Transport sets up a http/2 transport (using the
// gRPC http2 server transport in transport/http2_server.go).
func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) transport.ServerTransport {
@ -924,10 +946,10 @@ func (s *Server) ServeHTTP(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
if !s.addConn(st) {
if !s.addConn(listenerAddressForServeHTTP, st) {
return
}
defer s.removeConn(st)
defer s.removeConn(listenerAddressForServeHTTP, st)
s.serveStreams(st)
}
@ -955,7 +977,7 @@ func (s *Server) traceInfo(st transport.ServerTransport, stream *transport.Strea
return trInfo
}
func (s *Server) addConn(st transport.ServerTransport) bool {
func (s *Server) addConn(addr string, st transport.ServerTransport) bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns == nil {
@ -967,15 +989,28 @@ func (s *Server) addConn(st transport.ServerTransport) bool {
// immediately.
st.Drain()
}
s.conns[st] = true
if s.conns[addr] == nil {
// Create a map entry if this is the first connection on this listener.
s.conns[addr] = make(map[transport.ServerTransport]bool)
}
s.conns[addr][st] = true
return true
}
func (s *Server) removeConn(st transport.ServerTransport) {
func (s *Server) removeConn(addr string, st transport.ServerTransport) {
s.mu.Lock()
defer s.mu.Unlock()
if s.conns != nil {
delete(s.conns, st)
conns := s.conns[addr]
if conns != nil {
delete(conns, st)
if len(conns) == 0 {
// If the last connection for this address is being removed, also
// remove the map entry corresponding to the address. This is used
// in GracefulStop() when waiting for all connections to be closed.
delete(s.conns, addr)
}
s.cv.Broadcast()
}
}
@ -1639,7 +1674,7 @@ func (s *Server) Stop() {
s.mu.Lock()
listeners := s.lis
s.lis = nil
st := s.conns
conns := s.conns
s.conns = nil
// interrupt GracefulStop if Stop and GracefulStop are called concurrently.
s.cv.Broadcast()
@ -1648,8 +1683,10 @@ func (s *Server) Stop() {
for lis := range listeners {
lis.Close()
}
for c := range st {
c.Close()
for _, cs := range conns {
for st := range cs {
st.Close()
}
}
if s.opts.numServerWorkers > 0 {
s.stopServerWorkers()
@ -1686,8 +1723,10 @@ func (s *Server) GracefulStop() {
}
s.lis = nil
if !s.drain {
for st := range s.conns {
st.Drain()
for _, conns := range s.conns {
for st := range conns {
st.Drain()
}
}
s.drain = true
}

View File

@ -48,6 +48,42 @@ var (
backoffFunc = bs.Backoff
)
// ServingMode indicates the current mode of operation of the server.
//
// This API exactly mirrors the one in the public xds package. We have to
// redefine it here to avoid a cyclic dependency.
type ServingMode int
const (
// ServingModeStarting indicates that the serving is starting up.
ServingModeStarting ServingMode = iota
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing
)
func (s ServingMode) String() string {
switch s {
case ServingModeNotServing:
return "not-serving"
case ServingModeServing:
return "serving"
default:
return "starting"
}
}
// ServingModeCallback is the callback that users can register to get notified
// about the server's serving mode changes. The callback is invoked with the
// address of the listener and its new mode. The err parameter is set to a
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p))
}
@ -70,6 +106,8 @@ type ListenerWrapperParams struct {
XDSCredsInUse bool
// XDSClient provides the functionality from the xdsClient required here.
XDSClient XDSClientInterface
// ModeCallback is the callback to invoke when the serving mode changes.
ModeCallback ServingModeCallback
}
// NewListenerWrapper creates a new listenerWrapper with params. It returns a
@ -83,6 +121,7 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
name: params.ListenerResourceName,
xdsCredsInUse: params.XDSCredsInUse,
xdsC: params.XDSClient,
modeCallback: params.ModeCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
closed: grpcsync.NewEvent(),
@ -111,12 +150,11 @@ type listenerWrapper struct {
net.Listener
logger *internalgrpclog.PrefixLogger
// TODO: Maintain serving state of this listener.
name string
xdsCredsInUse bool
xdsC XDSClientInterface
cancelWatch func()
modeCallback ServingModeCallback
// Set to true if the listener is bound to the IP_ANY address (which is
// "0.0.0.0" for IPv4 and "::" for IPv6).
@ -138,11 +176,14 @@ type listenerWrapper struct {
// updates received in the callback if this event has fired.
closed *grpcsync.Event
// Filter chains received as part of the last good update. The reason for
// using an rw lock here is that this field will be read by all connections
// during their server-side handshake (in the hot path), but writes to this
// happen rarely (when we get a Listener resource update).
mu sync.RWMutex
// mu guards access to the current serving mode and the filter chains. The
// reason for using an rw lock here is that these fields are read in
// Accept() for all incoming connections, but writes happen rarely (when we
// get a Listener resource update).
mu sync.RWMutex
// Current serving mode.
mode ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager
}
@ -175,8 +216,6 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
// Reset retries after a successful Accept().
retries = 0
// TODO: Close connections if in "non-serving" state
// Since the net.Conn represents an incoming connection, the source and
// destination address can be retrieved from the local address and
// remote address of the net.Conn respectively.
@ -191,6 +230,17 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
}
l.mu.RLock()
if l.mode == ServingModeNotServing {
// Close connections as soon as we accept them when we are in
// "not-serving" mode. Since we accept a net.Listener from the user
// in Serve(), we cannot close the listener when we move to
// "not-serving". Closing the connection immediately upon accepting
// is one of the other ways to implement the "not-serving" mode as
// outlined in gRFC A36.
l.mu.RUnlock()
conn.Close()
continue
}
fc, err := l.filterChains.Lookup(xdsclient.FilterChainLookupParams{
IsUnspecifiedListener: l.isUnspecifiedAddr,
DestAddr: destAddr.IP,
@ -236,14 +286,13 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
return
}
// TODO: Handle resource-not-found errors by moving to not-serving state.
if err != nil {
// We simply log an error here and hope we get a successful update
// in the future. The error could be because of a timeout or an
// actual error, like the requested resource not found. In any case,
// it is fine for the server to hang indefinitely until Stop() is
// called.
l.logger.Warningf("Received error for resource %q: %+v", l.name, err)
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
l.logger.Infof("Received update for resource %q: %+v", l.name, update)
@ -258,18 +307,26 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
// appropriate context to perform this check.
//
// What this means is that the xdsClient has ACKed a resource which can push
// the server into a "not serving" state. This is not ideal, but this is
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
// TODO: Switch to "not serving" if the host:port does not match.
l.logger.Warningf("Received host:port (%s:%d) in Listener update does not match local listening address: (%s:%s", ilc.Address, ilc.Port, l.addr, l.port)
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}
l.mu.Lock()
l.filterChains = ilc.FilterChains
l.mu.Unlock()
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.goodUpdate.Fire()
// TODO: Move to serving state on receipt of a good response.
}
func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {
l.mu.Lock()
defer l.mu.Unlock()
l.filterChains = fcs
l.mode = newMode
if l.modeCallback != nil {
l.modeCallback(l.Listener.Addr(), newMode, err)
}
l.logger.Warningf("Listener %q entering mode: %q due to error: %v", l.Addr(), newMode, err)
}

View File

@ -32,7 +32,8 @@ import (
)
const (
defaultTestTimeout = 10 * time.Second
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)
type s struct {

View File

@ -0,0 +1,297 @@
// +build go1.13
// +build !386
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// Package xds_test contains e2e tests for xDS use.
package xds_test
import (
"context"
"fmt"
"net"
"path"
"sync"
"testing"
v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3"
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
xdscreds "google.golang.org/grpc/credentials/xds"
"google.golang.org/grpc/internal/testutils"
xdsinternal "google.golang.org/grpc/internal/xds"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/xds"
xdstestutils "google.golang.org/grpc/xds/internal/testutils"
"google.golang.org/grpc/xds/internal/testutils/e2e"
)
// A convenience typed used to keep track of mode changes on multiple listeners.
type modeTracker struct {
mu sync.Mutex
modes map[string]xds.ServingMode
updateCh *testutils.Channel
}
func newModeTracker() *modeTracker {
return &modeTracker{
modes: make(map[string]xds.ServingMode),
updateCh: testutils.NewChannel(),
}
}
func (mt *modeTracker) updateMode(addr net.Addr, mode xds.ServingMode) {
mt.mu.Lock()
defer mt.mu.Unlock()
mt.modes[addr.String()] = mode
mt.updateCh.Send(nil)
}
func (mt *modeTracker) getMode(addr net.Addr) xds.ServingMode {
mt.mu.Lock()
defer mt.mu.Unlock()
return mt.modes[addr.String()]
}
func (mt *modeTracker) waitForUpdate(ctx context.Context) error {
_, err := mt.updateCh.Receive(ctx)
if err != nil {
return fmt.Errorf("error when waiting for a mode change update: %v", err)
}
return nil
}
// TestServerSideXDS_ServingModeChanges tests the serving mode functionality in
// xDS enabled gRPC servers. It verifies that appropriate mode changes happen in
// the server, and also verifies behavior of clientConns under these modes.
func (s) TestServerSideXDS_ServingModeChanges(t *testing.T) {
// Spin up a xDS management server on a local port.
nodeID := uuid.New().String()
fs, err := e2e.StartManagementServer()
if err != nil {
t.Fatal(err)
}
defer fs.Stop()
// Create certificate and key files in a temporary directory and generate
// certificate provider configuration for a file_watcher plugin.
tmpdir := createTmpDirWithFiles(t, "testServerSideXDS*", "x509/server1_cert.pem", "x509/server1_key.pem", "x509/client_ca_cert.pem")
cpc := e2e.DefaultFileWatcherConfig(path.Join(tmpdir, certFile), path.Join(tmpdir, keyFile), path.Join(tmpdir, rootFile))
// Create a bootstrap file in a temporary directory.
bsCleanup, err := xdsinternal.SetupBootstrapFile(xdsinternal.BootstrapOptions{
Version: xdsinternal.TransportV3,
NodeID: nodeID,
ServerURI: fs.Address,
CertificateProviders: cpc,
ServerListenerResourceNameTemplate: serverListenerResourceNameTemplate,
})
if err != nil {
t.Fatal(err)
}
defer bsCleanup()
// Configure xDS credentials to be used on the server-side.
creds, err := xdscreds.NewServerCredentials(xdscreds.ServerOptions{
FallbackCreds: insecure.NewCredentials(),
})
if err != nil {
t.Fatal(err)
}
// Create a server option to get notified about serving mode changes.
modeTracker := newModeTracker()
modeChangeOpt := xds.ServingModeCallback(func(addr net.Addr, args xds.ServingModeChangeArgs) {
t.Logf("serving mode for listener %q changed to %q, err: %v", addr.String(), args.Mode, args.Err)
modeTracker.updateMode(addr, args.Mode)
})
// Initialize an xDS-enabled gRPC server and register the stubServer on it.
server := xds.NewGRPCServer(grpc.Creds(creds), modeChangeOpt)
defer server.Stop()
testpb.RegisterTestServiceServer(server, &testService{})
// Create two local listeners and pass it to Serve().
lis1, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
lis2, err := xdstestutils.LocalTCPListener()
if err != nil {
t.Fatalf("testutils.LocalTCPListener() failed: %v", err)
}
go func() {
if err := server.Serve(lis1); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
go func() {
if err := server.Serve(lis2); err != nil {
t.Errorf("Serve() failed: %v", err)
}
}()
// Setup the fake management server to respond with Listener resources that
// we are interested in.
listener1 := listenerResourceWithoutSecurityConfig(t, lis1)
listener2 := listenerResourceWithoutSecurityConfig(t, lis2)
if err := fs.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1, listener2},
}); err != nil {
t.Error(err)
}
// Wait for both listeners to move to "serving" mode.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
}
if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
}
// Create a ClientConn to the first listener and make a successful RPCs.
cc1, err := grpc.DialContext(ctx, lis1.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc1.Close()
client1 := testpb.NewTestServiceClient(cc1)
if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
// Create a ClientConn to the second listener and make a successful RPCs.
cc2, err := grpc.DialContext(ctx, lis2.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc2.Close()
client2 := testpb.NewTestServiceClient(cc2)
if _, err := client2.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
// Update the management server to remove the second listener resource. This should
// push the only the second listener into "not-serving" mode.
if err := fs.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1},
}); err != nil {
t.Error(err)
}
if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeNotServing); err != nil {
t.Fatal(err)
}
// Make sure cc1 is still in READY state, while cc2 has moved out of READY.
if s := cc1.GetState(); s != connectivity.Ready {
t.Fatalf("clientConn1 state is %s, want %s", s, connectivity.Ready)
}
if !cc2.WaitForStateChange(ctx, connectivity.Ready) {
t.Fatal("clientConn2 failed to move out of READY")
}
// Make sure RPCs succeed on cc1 and fail on cc2.
if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
if _, err := client2.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatal("rpc EmptyCall() succeeded when expected to fail")
}
// Update the management server to remove the first listener resource as
// well. This should push the first listener into "not-serving" mode. Second
// listener is already in "not-serving" mode.
if err := fs.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{},
}); err != nil {
t.Error(err)
}
if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeNotServing); err != nil {
t.Fatal(err)
}
// Make sure cc1 has moved out of READY.
if !cc1.WaitForStateChange(ctx, connectivity.Ready) {
t.Fatal("clientConn1 failed to move out of READY")
}
// Make sure RPCs fail on both.
if _, err := client1.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatal("rpc EmptyCall() succeeded when expected to fail")
}
if _, err := client2.EmptyCall(ctx, &testpb.Empty{}); err == nil {
t.Fatal("rpc EmptyCall() succeeded when expected to fail")
}
// Make sure new connection attempts to "not-serving" servers fail. We use a
// short timeout since we expect this to fail.
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
_, err = grpc.DialContext(sCtx, lis1.Addr().String(), grpc.WithBlock(), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err == nil {
t.Fatal("successfully created clientConn to a server in \"not-serving\" state")
}
// Update the management server with both listener resources.
if err := fs.Update(e2e.UpdateOptions{
NodeID: nodeID,
Listeners: []*v3listenerpb.Listener{listener1, listener2},
}); err != nil {
t.Error(err)
}
// Wait for both listeners to move to "serving" mode.
if err := waitForModeChange(ctx, modeTracker, lis1.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
}
if err := waitForModeChange(ctx, modeTracker, lis2.Addr(), xds.ServingModeServing); err != nil {
t.Fatal(err)
}
// The clientConns created earlier should be able to make RPCs now.
if _, err := client1.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
if _, err := client2.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}
func waitForModeChange(ctx context.Context, modeTracker *modeTracker, addr net.Addr, wantMode xds.ServingMode) error {
for {
if gotMode := modeTracker.getMode(addr); gotMode == wantMode {
return nil
}
if err := modeTracker.waitForUpdate(ctx); err != nil {
return err
}
}
}

View File

@ -30,6 +30,7 @@ import (
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/buffer"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/grpcsync"
xdsclient "google.golang.org/grpc/xds/internal/client"
@ -48,9 +49,9 @@ var (
return grpc.NewServer(opts...)
}
// Unexported function to retrieve transport credentials from a gRPC server.
grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
logger = grpclog.Component("xds")
grpcGetServerCreds = internal.GetServerCredentials.(func(*grpc.Server) credentials.TransportCredentials)
drainServerTransports = internal.DrainServerTransports.(func(*grpc.Server, string))
logger = grpclog.Component("xds")
)
func prefixLogger(p *GRPCServer) *internalgrpclog.PrefixLogger {
@ -78,16 +79,12 @@ type grpcServerInterface interface {
// communication with a management server using xDS APIs. It implements the
// grpc.ServiceRegistrar interface and can be passed to service registration
// functions in IDL generated code.
//
// Experimental
//
// Notice: This type is EXPERIMENTAL and may be changed or removed in a
// later release.
type GRPCServer struct {
gs grpcServerInterface
quit *grpcsync.Event
logger *internalgrpclog.PrefixLogger
xdsCredsInUse bool
opts *serverOptions
// clientMu is used only in initXDSClient(), which is called at the
// beginning of Serve(), where we have to decide if we have to create a
@ -99,11 +96,6 @@ type GRPCServer struct {
// NewGRPCServer creates an xDS-enabled gRPC server using the passed in opts.
// The underlying gRPC server has no service registered and has not started to
// accept requests yet.
//
// Experimental
//
// Notice: This API is EXPERIMENTAL and may be changed or removed in a later
// release.
func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
newOpts := []grpc.ServerOption{
grpc.ChainUnaryInterceptor(xdsUnaryInterceptor),
@ -113,6 +105,7 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
s := &GRPCServer{
gs: newGRPCServer(newOpts...),
quit: grpcsync.NewEvent(),
opts: handleServerOptions(opts),
}
s.logger = prefixLogger(s)
s.logger.Infof("Created xds.GRPCServer")
@ -133,6 +126,18 @@ func NewGRPCServer(opts ...grpc.ServerOption) *GRPCServer {
return s
}
// handleServerOptions iterates through the list of server options passed in by
// the user, and handles the xDS server specific options.
func handleServerOptions(opts []grpc.ServerOption) *serverOptions {
so := &serverOptions{}
for _, opt := range opts {
if o, ok := opt.(serverOption); ok {
o.applyServerOption(so)
}
}
return so
}
// RegisterService registers a service and its implementation to the underlying
// gRPC server. It is called from the IDL generated code. This must be called
// before invoking Serve.
@ -165,7 +170,6 @@ func (s *GRPCServer) initXDSClient() error {
// initiated here.
//
// Serve will return a non-nil error unless Stop or GracefulStop is called.
// TODO: Support callback to get notified on serving state changes.
func (s *GRPCServer) Serve(lis net.Listener) error {
s.logger.Infof("Serve() passed a net.Listener on %s", lis.Addr().String())
if _, ok := lis.Addr().(*net.TCPAddr); !ok {
@ -207,6 +211,11 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
name = strings.Replace(cfg.ServerListenerResourceNameTemplate, "%s", lis.Addr().String(), -1)
}
modeUpdateCh := buffer.NewUnbounded()
go func() {
s.handleServingModeChanges(modeUpdateCh)
}()
// Create a listenerWrapper which handles all functionality required by
// this particular instance of Serve().
lw, goodUpdateCh := server.NewListenerWrapper(server.ListenerWrapperParams{
@ -214,6 +223,13 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
ListenerResourceName: name,
XDSCredsInUse: s.xdsCredsInUse,
XDSClient: s.xdsC,
ModeCallback: func(addr net.Addr, mode server.ServingMode, err error) {
modeUpdateCh.Put(&modeChangeArgs{
addr: addr,
mode: mode,
err: err,
})
},
})
// Block until a good LDS response is received or the server is stopped.
@ -229,6 +245,47 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
return s.gs.Serve(lw)
}
// modeChangeArgs wraps argument required for invoking mode change callback.
type modeChangeArgs struct {
addr net.Addr
mode server.ServingMode
err error
}
// handleServingModeChanges runs as a separate goroutine, spawned from Serve().
// It reads a channel on to which mode change arguments are pushed, and in turn
// invokes the user registered callback. It also calls an internal method on the
// underlying grpc.Server to gracefully close existing connections, if the
// listener moved to a "not-serving" mode.
func (s *GRPCServer) handleServingModeChanges(updateCh *buffer.Unbounded) {
for {
select {
case <-s.quit.Done():
return
case u := <-updateCh.Get():
updateCh.Load()
args := u.(*modeChangeArgs)
if args.mode == ServingModeNotServing {
// We type assert our underlying gRPC server to the real
// grpc.Server here before trying to initiate the drain
// operation. This approach avoids performing the same type
// assertion in the grpc package which provides the
// implementation for internal.GetServerCredentials, and allows
// us to use a fake gRPC server in tests.
if gs, ok := s.gs.(*grpc.Server); ok {
drainServerTransports(gs, args.addr.String())
}
}
if s.opts.modeCallback != nil {
s.opts.modeCallback(args.addr, ServingModeChangeArgs{
Mode: args.mode,
Err: args.err,
})
}
}
}
}
// Stop stops the underlying gRPC server. It immediately closes all open
// connections. It cancels all active RPCs on the server side and the
// corresponding pending RPCs on the client side will get notified by connection

84
xds/server_options.go Normal file
View File

@ -0,0 +1,84 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds
import (
"net"
"google.golang.org/grpc"
iserver "google.golang.org/grpc/xds/internal/server"
)
// ServingModeCallback returns a grpc.ServerOption which allows users to
// register a callback to get notified about serving mode changes.
func ServingModeCallback(cb ServingModeCallbackFunc) grpc.ServerOption {
return &smcOption{cb: cb}
}
type serverOption interface {
applyServerOption(*serverOptions)
}
// smcOption is a server option containing a callback to be invoked when the
// serving mode changes.
type smcOption struct {
// Embedding the empty server option makes it safe to pass it to
// grpc.NewServer().
grpc.EmptyServerOption
cb ServingModeCallbackFunc
}
func (s *smcOption) applyServerOption(o *serverOptions) {
o.modeCallback = s.cb
}
type serverOptions struct {
modeCallback ServingModeCallbackFunc
}
// ServingMode indicates the current mode of operation of the server.
type ServingMode = iserver.ServingMode
const (
// ServingModeServing indicates the the server contains all required xDS
// configuration is serving RPCs.
ServingModeServing = iserver.ServingModeServing
// ServingModeNotServing indicates that the server is not accepting new
// connections. Existing connections will be closed gracefully, allowing
// in-progress RPCs to complete. A server enters this mode when it does not
// contain the required xDS configuration to serve RPCs.
ServingModeNotServing = iserver.ServingModeNotServing
)
// ServingModeCallbackFunc is the callback that users can register to get
// notified about the server's serving mode changes. The callback is invoked
// with the address of the listener and its new mode.
//
// Users must not perform any blocking operations in this callback.
type ServingModeCallbackFunc func(addr net.Addr, args ServingModeChangeArgs)
// ServingModeChangeArgs wraps the arguments passed to the serving mode callback
// function.
type ServingModeChangeArgs struct {
// Mode is the new serving mode of the server listener.
Mode ServingMode
// Err is set to a non-nil error if the server has transitioned into
// not-serving mode.
Err error
}

View File

@ -311,7 +311,14 @@ func (s) TestServeSuccess(t *testing.T) {
fs, clientCh, cleanup := setupOverrides()
defer cleanup()
server := NewGRPCServer()
// Create a new xDS-enabled gRPC server and pass it a server option to get
// notified about serving mode changes.
modeChangeCh := testutils.NewChannel()
modeChangeOption := ServingModeCallback(func(addr net.Addr, args ServingModeChangeArgs) {
t.Logf("server mode change callback invoked for listener %q with mode %q and error %v", addr.String(), args.Mode, args.Err)
modeChangeCh.Send(args.Mode)
})
server := NewGRPCServer(modeChangeOption)
defer server.Stop()
lis, err := xdstestutils.LocalTCPListener()
@ -349,13 +356,22 @@ func (s) TestServeSuccess(t *testing.T) {
// Push an error to the registered listener watch callback and make sure
// that Serve does not return.
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{}, errors.New("LDS error"))
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{}, xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "LDS resource not found"))
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCancel()
if _, err := serveDone.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatal("Serve() returned after a bad LDS response")
}
// Make sure the serving mode changes appropriately.
v, err := modeChangeCh.Receive(ctx)
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing)
}
// Push a good LDS response, and wait for Serve() to be invoked on the
// underlying grpc.Server.
addr, port := splitHostPort(lis.Addr().String())
@ -370,11 +386,18 @@ func (s) TestServeSuccess(t *testing.T) {
t.Fatalf("error when waiting for Serve() to be invoked on the grpc.Server")
}
// Make sure the serving mode changes appropriately.
v, err = modeChangeCh.Receive(ctx)
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeServing)
}
// Push an update to the registered listener watch callback with a Listener
// resource whose host:port does not match the actual listening address and
// port. Serve() should not return and should continue to use the old state.
//
// This will change once we add start tracking serving state.
// port. This will push the listener to "not-serving" mode.
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
RouteConfigName: "routeconfig",
InboundListenerCfg: &xdsclient.InboundListenerConfig{
@ -387,6 +410,15 @@ func (s) TestServeSuccess(t *testing.T) {
if _, err := serveDone.Receive(sCtx); err != context.DeadlineExceeded {
t.Fatal("Serve() returned after a bad LDS response")
}
// Make sure the serving mode changes appropriately.
v, err = modeChangeCh.Receive(ctx)
if err != nil {
t.Fatalf("error when waiting for serving mode to change: %v", err)
}
if mode := v.(ServingMode); mode != ServingModeNotServing {
t.Fatalf("server mode is %q, want %q", mode, ServingModeNotServing)
}
}
// TestServeWithStop tests the case where Stop() is called before an LDS update

View File

@ -25,6 +25,11 @@
//
// See https://github.com/grpc/grpc-go/tree/master/examples/features/xds for
// example.
//
// Experimental
//
// Notice: All APIs in this package are experimental and may be removed in a
// later release.
package xds
import (