xds: Add support for Dynamic RDS in listener wrapper (#4655)

* Add support for Dynamic RDS in listener wrapper
This commit is contained in:
Zach Reyes 2021-08-11 18:48:24 -04:00 committed by GitHub
parent 88dc96b463
commit ad87ad0098
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 914 additions and 63 deletions

View File

@ -169,7 +169,7 @@ func (s) TestServiceWatch(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -187,7 +187,7 @@ func (s) TestServiceWatch(t *testing.T) {
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}},
}},
}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -223,7 +223,7 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -237,14 +237,14 @@ func (s) TestServiceWatchLDSUpdate(t *testing.T) {
// Another LDS update with a different RDS_name.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr + "2"}, nil)
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
if _, err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
waitForWatchRouteConfig(ctx, t, xdsC, routeStr+"2")
// RDS update for the new name.
wantUpdate2 := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster + "2": {Weight: 1}}}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback(routeStr+"2", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -279,7 +279,7 @@ func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) {
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}},
ldsConfig: ldsConfig{maxStreamDuration: time.Second},
}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -303,7 +303,7 @@ func (s) TestServiceWatchLDSUpdateMaxStreamDuration(t *testing.T) {
Prefix: newStringP(""),
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster + "2": {Weight: 1}}}},
}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -337,7 +337,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
Prefix: newStringP(""),
WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}},
}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -354,7 +354,7 @@ func (s) TestServiceNotCancelRDSOnSameLDSUpdate(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
defer sCancel()
if err := xdsC.WaitForCancelRouteConfigWatch(sCtx); err != context.DeadlineExceeded {
if _, err := xdsC.WaitForCancelRouteConfigWatch(sCtx); err != context.DeadlineExceeded {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
}
@ -378,7 +378,7 @@ func (s) TestServiceWatchInlineRDS(t *testing.T) {
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
wantUpdate := serviceUpdate{virtualHost: &xdsclient.VirtualHost{Domains: []string{"target"}, Routes: []*xdsclient.Route{{Prefix: newStringP(""), WeightedClusters: map[string]xdsclient.WeightedCluster{cluster: {Weight: 1}}}}}}
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -402,7 +402,7 @@ func (s) TestServiceWatchInlineRDS(t *testing.T) {
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
}}, nil)
// This inline RDS resource should cause the RDS watch to be canceled.
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
if _, err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {
@ -412,7 +412,7 @@ func (s) TestServiceWatchInlineRDS(t *testing.T) {
// Switch LDS update back to LDS with RDS name to watch.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{RouteConfigName: routeStr}, nil)
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -429,7 +429,7 @@ func (s) TestServiceWatchInlineRDS(t *testing.T) {
VirtualHosts: []*xdsclient.VirtualHost{wantVirtualHosts2},
}}, nil)
// This inline RDS resource should cause the RDS watch to be canceled.
if err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
if _, err := xdsC.WaitForCancelRouteConfigWatch(ctx); err != nil {
t.Fatalf("wait for cancel route watch failed: %v, want nil", err)
}
if err := verifyServiceUpdate(ctx, serviceUpdateCh, wantUpdate2); err != nil {

View File

@ -270,7 +270,7 @@ func (s) TestXDSResolverWatchCallbackAfterClose(t *testing.T) {
// Call the watchAPI callback after closing the resolver, and make sure no
// update is triggerred on the ClientConn.
xdsR.Close()
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -317,7 +317,7 @@ func (s) TestXDSResolverBadServiceUpdate(t *testing.T) {
// Invoke the watchAPI callback with a bad service update and wait for the
// ReportError method to be called on the ClientConn.
suErr := errors.New("bad serviceupdate")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{}, suErr)
if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr {
t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr)
@ -406,7 +406,7 @@ func (s) TestXDSResolverGoodServiceUpdate(t *testing.T) {
} {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -480,7 +480,7 @@ func (s) TestXDSResolverRequestHash(t *testing.T) {
waitForWatchRouteConfig(ctx, t, xdsC, routeStr)
// Invoke watchAPI callback with a good service update (with hash policies
// specified) and wait for UpdateState method to be called on ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -541,7 +541,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -572,7 +572,7 @@ func (s) TestXDSResolverRemovedWithRPCs(t *testing.T) {
// Delete the resource
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{}, suErr)
if _, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
@ -601,7 +601,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -651,7 +651,7 @@ func (s) TestXDSResolverRemovedResource(t *testing.T) {
// Delete the resource. The channel should receive a service config with the
// original cluster but with an erroring config selector.
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{}, suErr)
if gotState, err = tcc.stateCh.Receive(ctx); err != nil {
t.Fatalf("Error waiting for UpdateState to be called: %v", err)
@ -712,7 +712,7 @@ func (s) TestXDSResolverWRR(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -772,7 +772,7 @@ func (s) TestXDSResolverMaxStreamDuration(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -862,7 +862,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -912,7 +912,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
// Perform TWO updates to ensure the old config selector does not hold a
// reference to test-cluster-1.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -922,7 +922,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
}, nil)
tcc.stateCh.Receive(ctx) // Ignore the first update.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -961,7 +961,7 @@ func (s) TestXDSResolverDelayedOnCommitted(t *testing.T) {
// test-cluster-1.
res.OnCommitted()
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -1012,7 +1012,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
// Invoke the watchAPI callback with a bad service update and wait for the
// ReportError method to be called on the ClientConn.
suErr := errors.New("bad serviceupdate")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{}, suErr)
if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr {
t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr)
@ -1020,7 +1020,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},
@ -1040,7 +1040,7 @@ func (s) TestXDSResolverGoodUpdateAfterError(t *testing.T) {
// Invoke the watchAPI callback with a bad service update and wait for the
// ReportError method to be called on the ClientConn.
suErr2 := errors.New("bad serviceupdate 2")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr2)
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{}, suErr2)
if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != nil || gotErrVal != suErr2 {
t.Fatalf("ClientConn.ReportError() received %v, want %v", gotErrVal, suErr2)
}
@ -1066,7 +1066,7 @@ func (s) TestXDSResolverResourceNotFoundError(t *testing.T) {
// Invoke the watchAPI callback with a bad service update and wait for the
// ReportError method to be called on the ClientConn.
suErr := xdsclient.NewErrorf(xdsclient.ErrorTypeResourceNotFound, "resource removed error")
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{}, suErr)
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{}, suErr)
if gotErrVal, gotErr := tcc.errorCh.Receive(ctx); gotErr != context.DeadlineExceeded {
t.Fatalf("ClientConn.ReportError() received %v, %v, want channel recv timeout", gotErrVal, gotErr)
@ -1295,7 +1295,7 @@ func (s) TestXDSResolverHTTPFilters(t *testing.T) {
// Invoke the watchAPI callback with a good service update and wait for the
// UpdateState method to be called on the ClientConn.
xdsC.InvokeWatchRouteConfigCallback(xdsclient.RouteConfigUpdate{
xdsC.InvokeWatchRouteConfigCallback("", xdsclient.RouteConfigUpdate{
VirtualHosts: []*xdsclient.VirtualHost{
{
Domains: []string{targetStr},

View File

@ -84,6 +84,13 @@ func (s ServingMode) String() string {
// non-nil error if the server has transitioned into not-serving mode.
type ServingModeCallback func(addr net.Addr, mode ServingMode, err error)
// DrainCallback is the callback that an xDS-enabled server registers to get
// notified about updates to the Listener configuration. The server is expected
// to gracefully shutdown existing connections, thereby forcing clients to
// reconnect and have the new configuration applied to the newly created
// connections.
type DrainCallback func(addr net.Addr)
func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", p))
}
@ -92,6 +99,7 @@ func prefixLogger(p *listenerWrapper) *internalgrpclog.PrefixLogger {
// the listenerWrapper.
type XDSClient interface {
WatchListener(string, func(xdsclient.ListenerUpdate, error)) func()
WatchRouteConfig(string, func(xdsclient.RouteConfigUpdate, error)) func()
BootstrapConfig() *bootstrap.Config
}
@ -108,6 +116,9 @@ type ListenerWrapperParams struct {
XDSClient XDSClient
// ModeCallback is the callback to invoke when the serving mode changes.
ModeCallback ServingModeCallback
// DrainCallback is the callback to invoke when the Listener gets a LDS
// update.
DrainCallback DrainCallback
}
// NewListenerWrapper creates a new listenerWrapper with params. It returns a
@ -122,10 +133,13 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
xdsCredsInUse: params.XDSCredsInUse,
xdsC: params.XDSClient,
modeCallback: params.ModeCallback,
drainCallback: params.DrainCallback,
isUnspecifiedAddr: params.Listener.Addr().(*net.TCPAddr).IP.IsUnspecified(),
closed: grpcsync.NewEvent(),
goodUpdate: grpcsync.NewEvent(),
closed: grpcsync.NewEvent(),
goodUpdate: grpcsync.NewEvent(),
ldsUpdateCh: make(chan ldsUpdateWithError, 1),
rdsUpdateCh: make(chan rdsHandlerUpdate, 1),
}
lw.logger = prefixLogger(lw)
@ -134,15 +148,23 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru
lisAddr := lw.Listener.Addr().String()
lw.addr, lw.port, _ = net.SplitHostPort(lisAddr)
lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh)
cancelWatch := lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate)
lw.logger.Infof("Watch started on resource name %v", lw.name)
lw.cancelWatch = func() {
cancelWatch()
lw.logger.Infof("Watch cancelled on resource name %v", lw.name)
}
go lw.run()
return lw, lw.goodUpdate.Done()
}
type ldsUpdateWithError struct {
update xdsclient.ListenerUpdate
err error
}
// listenerWrapper wraps the net.Listener associated with the listening address
// passed to Serve(). It also contains all other state associated with this
// particular invocation of Serve().
@ -155,6 +177,7 @@ type listenerWrapper struct {
xdsC XDSClient
cancelWatch func()
modeCallback ServingModeCallback
drainCallback DrainCallback
// Set to true if the listener is bound to the IP_ANY address (which is
// "0.0.0.0" for IPv4 and "::" for IPv6).
@ -185,6 +208,16 @@ type listenerWrapper struct {
mode ServingMode
// Filter chains received as part of the last good update.
filterChains *xdsclient.FilterChainManager
// rdsHandler is used for any dynamic RDS resources specified in a LDS
// update.
rdsHandler *rdsHandler
// rdsUpdates are the RDS resources received from the management
// server, keyed on the RouteName of the RDS resource.
rdsUpdates map[string]xdsclient.RouteConfigUpdate // TODO: if this will be read in accept, this will need a read lock as well.
// ldsUpdateCh is a channel for XDSClient LDS updates.
ldsUpdateCh chan ldsUpdateWithError
// rdsUpdateCh is a channel for XDSClient RDS updates.
rdsUpdateCh chan rdsHandlerUpdate
}
// Accept blocks on an Accept() on the underlying listener, and wraps the
@ -264,6 +297,10 @@ func (l *listenerWrapper) Accept() (net.Conn, error) {
conn.Close()
continue
}
// TODO: once matched an accepted connection to a filter chain,
// instantiate the HTTP filters in the filter chain + the filter
// overrides, pipe filters and route into connection, which will
// eventually be passed to xdsUnary/Stream interceptors.
return &connWrapper{Conn: conn, filterChain: fc, parent: l}, nil
}
}
@ -277,25 +314,76 @@ func (l *listenerWrapper) Close() error {
if l.cancelWatch != nil {
l.cancelWatch()
}
l.rdsHandler.close()
return nil
}
// run is a long running goroutine which handles all xds updates. LDS and RDS
// push updates onto a channel which is read and acted upon from this goroutine.
func (l *listenerWrapper) run() {
for {
select {
case <-l.closed.Done():
return
case u := <-l.ldsUpdateCh:
l.handleLDSUpdate(u)
case u := <-l.rdsUpdateCh:
l.handleRDSUpdate(u)
}
}
}
// handleLDSUpdate is the callback which handles LDS Updates. It writes the
// received update to the update channel, which is picked up by the run
// goroutine.
func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate, err error) {
if l.closed.HasFired() {
l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err)
return
}
// Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update
// listener cares about is most recent update.
select {
case <-l.ldsUpdateCh:
default:
}
l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err}
}
if err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, err)
if xdsclient.ErrType(err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, err)
// handleRDSUpdate handles a full rds update from rds handler. On a successful
// update, the server will switch to ServingModeServing as the full
// configuration (both LDS and RDS) has been received.
func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) {
if l.closed.HasFired() {
l.logger.Warningf("RDS received update: %v with error: %v, after listener was closed", update.updates, update.err)
return
}
if update.err != nil {
l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
l.logger.Infof("Received update for resource %q: %+v", l.name, update)
l.rdsUpdates = update.updates
l.switchMode(l.filterChains, ServingModeServing, nil)
l.goodUpdate.Fire()
}
func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) {
if update.err != nil {
l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err)
if xdsclient.ErrType(update.err) == xdsclient.ErrorTypeResourceNotFound {
l.switchMode(nil, ServingModeNotServing, update.err)
}
// For errors which are anything other than "resource-not-found", we
// continue to use the old configuration.
return
}
l.logger.Infof("Received update for resource %q: %+v", l.name, update.update)
// Make sure that the socket address on the received Listener resource
// matches the address of the net.Listener passed to us by the user. This
@ -309,14 +397,30 @@ func (l *listenerWrapper) handleListenerUpdate(update xdsclient.ListenerUpdate,
// What this means is that the XDSClient has ACKed a resource which can push
// the server into a "not serving" mode. This is not ideal, but this is
// what we have decided to do. See gRPC A36 for more details.
ilc := update.InboundListenerCfg
ilc := update.update.InboundListenerCfg
if ilc.Address != l.addr || ilc.Port != l.port {
l.switchMode(nil, ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port))
return
}
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.goodUpdate.Fire()
// "Updates to a Listener cause all older connections on that Listener to be
// gracefully shut down with a grace period of 10 minutes for long-lived
// RPC's, such that clients will reconnect and have the updated
// configuration apply." - A36 Note that this is not the same as moving the
// Server's state to ServingModeNotServing. That prevents new connections
// from being accepted, whereas here we simply want the clients to reconnect
// to get the updated configuration.
if l.drainCallback != nil {
l.drainCallback(l.Listener.Addr())
}
l.rdsHandler.updateRouteNamesToWatch(ilc.FilterChains.RouteConfigNames)
// If there are no dynamic RDS Configurations still needed to be received
// from the management server, this listener has all the configuration
// needed, and is ready to serve.
if len(ilc.FilterChains.RouteConfigNames) == 0 {
l.switchMode(ilc.FilterChains, ServingModeServing, nil)
l.goodUpdate.Fire()
}
}
func (l *listenerWrapper) switchMode(fcs *xdsclient.FilterChainManager, newMode ServingMode, err error) {

View File

@ -48,6 +48,50 @@ const (
defaultTestShortTimeout = 10 * time.Millisecond
)
var listenerWithRouteConfiguration = &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourcePorts: []uint32{80},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_Rds{
Rds: &v3httppb.Rds{
ConfigSource: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{Ads: &v3corepb.AggregatedConfigSource{}},
},
RouteConfigName: "route-1",
},
},
}),
},
},
},
},
},
}
var listenerWithFilterChains = &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
@ -221,7 +265,7 @@ func (s) TestNewListenerWrapper(t *testing.T) {
t.Fatalf("error when waiting for a watch on a Listener resource: %v", err)
}
if name != testListenerResourceName {
t.Fatalf("listenerWrapper registered a watch on %s, want %s", name, testListenerResourceName)
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName)
}
// Push an error to the listener update handler.
@ -234,12 +278,18 @@ func (s) TestNewListenerWrapper(t *testing.T) {
t.Fatalf("ready channel written to after receipt of a bad Listener update")
}
fcm, err := xdsclient.NewFilterChainManager(listenerWithFilterChains)
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
// Push an update whose address does not match the address to which our
// listener is bound, and verify that the ready channel is not written to.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
InboundListenerCfg: &xdsclient.InboundListenerConfig{
Address: "10.0.0.1",
Port: "50051",
Address: "10.0.0.1",
Port: "50051",
FilterChains: fcm,
}}, nil)
timer = time.NewTimer(defaultTestShortTimeout)
select {
@ -250,11 +300,16 @@ func (s) TestNewListenerWrapper(t *testing.T) {
}
// Push a good update, and verify that the ready channel is written to.
// Since there are no dynamic RDS updates needed to be received, the
// ListenerWrapper does not have to wait for anything else before telling
// that it is ready.
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
InboundListenerCfg: &xdsclient.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good Listener update")
@ -262,6 +317,75 @@ func (s) TestNewListenerWrapper(t *testing.T) {
}
}
// TestNewListenerWrapperWithRouteUpdate tests the scenario where the listener
// gets built, starts a watch, that watch returns a list of Route Names to
// return, than receives an update from the rds handler. Only after receiving
// the update from the rds handler should it move the server to
// ServingModeServing.
func (s) TestNewListenerWrapperWithRouteUpdate(t *testing.T) {
_, readyCh, xdsC, _, cleanup := newListenerWrapper(t)
defer cleanup()
// Verify that the listener wrapper registers a listener watch for the
// expected Listener resource name.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
name, err := xdsC.WaitForWatchListener(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Listener resource: %v", err)
}
if name != testListenerResourceName {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", name, testListenerResourceName)
}
fcm, err := xdsclient.NewFilterChainManager(listenerWithRouteConfiguration)
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
// Push a good update which contains a Filter Chain that specifies dynamic
// RDS Resources that need to be received. This should ping rds handler
// about which rds names to start, which will eventually start a watch on
// xds client for rds name "route-1".
xdsC.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
InboundListenerCfg: &xdsclient.InboundListenerConfig{
Address: fakeListenerHost,
Port: strconv.Itoa(fakeListenerPort),
FilterChains: fcm,
}}, nil)
// This should start a watch on xds client for rds name "route-1".
routeName, err := xdsC.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("error when waiting for a watch on a Route resource: %v", err)
}
if routeName != "route-1" {
t.Fatalf("listenerWrapper registered a lds watch on %s, want %s", routeName, "route-1")
}
// This shouldn't invoke good update channel, as has not received rds updates yet.
timer := time.NewTimer(defaultTestShortTimeout)
select {
case <-timer.C:
timer.Stop()
case <-readyCh:
t.Fatalf("ready channel written to without rds configuration specified")
}
// Invoke rds callback for the started rds watch. This valid rds callback
// should trigger the listener wrapper to fire GoodUpdate, as it has
// received both it's LDS Configuration and also RDS Configuration,
// specified in LDS Configuration.
xdsC.InvokeWatchRouteConfigCallback("route-1", xdsclient.RouteConfigUpdate{}, nil)
// All of the xDS updates have completed, so can expect to send a ping on
// good update channel.
select {
case <-ctx.Done():
t.Fatalf("timeout waiting for the ready channel to be written to after receipt of a good rds update")
case <-readyCh:
}
}
func (s) TestListenerWrapper_Accept(t *testing.T) {
boCh := testutils.NewChannel()
origBackoffFunc := backoffFunc

View File

@ -0,0 +1,133 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package server
import (
"sync"
"google.golang.org/grpc/xds/internal/xdsclient"
)
// rdsHandlerUpdate wraps the full RouteConfigUpdate that are dynamically
// queried for a given server side listener.
type rdsHandlerUpdate struct {
updates map[string]xdsclient.RouteConfigUpdate
err error
}
// rdsHandler handles any RDS queries that need to be started for a given server
// side listeners Filter Chains (i.e. not inline).
type rdsHandler struct {
xdsC XDSClient
mu sync.Mutex
updates map[string]xdsclient.RouteConfigUpdate
cancels map[string]func()
// For a rdsHandler update, the only update wrapped listener cares about is
// most recent one, so this channel will be opportunistically drained before
// sending any new updates.
updateChannel chan rdsHandlerUpdate
}
// newRDSHandler creates a new rdsHandler to watch for RDS resources.
// listenerWrapper updates the list of route names to watch by calling
// updateRouteNamesToWatch() upon receipt of new Listener configuration.
func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler {
return &rdsHandler{
xdsC: xdsC,
updateChannel: ch,
updates: make(map[string]xdsclient.RouteConfigUpdate),
cancels: make(map[string]func()),
}
}
// updateRouteNamesToWatch handles a list of route names to watch for a given
// server side listener (if a filter chain specifies dynamic RDS configuration).
// This function handles all the logic with respect to any routes that may have
// been added or deleted as compared to what was previously present.
func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) {
rh.mu.Lock()
defer rh.mu.Unlock()
// Add and start watches for any routes for any new routes in
// routeNamesToWatch.
for routeName := range routeNamesToWatch {
if _, ok := rh.cancels[routeName]; !ok {
func(routeName string) {
rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsclient.RouteConfigUpdate, err error) {
rh.handleRouteUpdate(routeName, update, err)
})
}(routeName)
}
}
// Delete and cancel watches for any routes from persisted routeNamesToWatch
// that are no longer present.
for routeName := range rh.cancels {
if _, ok := routeNamesToWatch[routeName]; !ok {
rh.cancels[routeName]()
delete(rh.cancels, routeName)
delete(rh.updates, routeName)
}
}
// If the full list (determined by length) of updates are now successfully
// updated, the listener is ready to be updated.
if len(rh.updates) == len(rh.cancels) && len(routeNamesToWatch) != 0 {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
}
}
// handleRouteUpdate persists the route config for a given route name, and also
// sends an update to the Listener Wrapper on an error received or if the rds
// handler has a full collection of updates.
func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsclient.RouteConfigUpdate, err error) {
if err != nil {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err})
return
}
rh.mu.Lock()
defer rh.mu.Unlock()
rh.updates[routeName] = update
// If the full list (determined by length) of updates have successfully
// updated, the listener is ready to be updated.
if len(rh.updates) == len(rh.cancels) {
drainAndPush(rh.updateChannel, rdsHandlerUpdate{updates: rh.updates})
}
}
func drainAndPush(ch chan rdsHandlerUpdate, update rdsHandlerUpdate) {
select {
case <-ch:
default:
}
ch <- update
}
// close() is meant to be called by wrapped listener when the wrapped listener
// is closed, and it cleans up resources by canceling all the active RDS
// watches.
func (rh *rdsHandler) close() {
rh.mu.Lock()
defer rh.mu.Unlock()
for _, cancel := range rh.cancels {
cancel()
}
}

View File

@ -0,0 +1,403 @@
// +build go1.12
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package server
import (
"context"
"errors"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"google.golang.org/grpc/xds/internal/testutils/fakeclient"
"google.golang.org/grpc/xds/internal/xdsclient"
)
const (
route1 = "route1"
route2 = "route2"
route3 = "route3"
)
// setupTests creates a rds handler with a fake xds client for control over the
// xds client.
func setupTests() (*rdsHandler, *fakeclient.Client, chan rdsHandlerUpdate) {
xdsC := fakeclient.NewClient()
ch := make(chan rdsHandlerUpdate, 1)
rh := newRDSHandler(xdsC, ch)
return rh, xdsC, ch
}
// waitForFuncWithNames makes sure that a blocking function returns the correct
// set of names, where order doesn't matter. This takes away nondeterminism from
// ranging through a map.
func waitForFuncWithNames(ctx context.Context, f func(context.Context) (string, error), names ...string) error {
wantNames := make(map[string]bool, len(names))
for _, name := range names {
wantNames[name] = true
}
gotNames := make(map[string]bool, len(names))
for range wantNames {
name, err := f(ctx)
if err != nil {
return err
}
gotNames[name] = true
}
if !cmp.Equal(gotNames, wantNames) {
return fmt.Errorf("got routeNames %v, want %v", gotNames, wantNames)
}
return nil
}
// TestSuccessCaseOneRDSWatch tests the simplest scenario: the rds handler
// receives a single route name, starts a watch for that route name, gets a
// successful update, and then writes an update to the update channel for
// listener to pick up.
func (s) TestSuccessCaseOneRDSWatch(t *testing.T) {
rh, fakeClient, ch := setupTests()
// When you first update the rds handler with a list of a single Route names
// that needs dynamic RDS Configuration, this Route name has not been seen
// before, so the RDS Handler should start a watch on that RouteName.
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// The RDS Handler should start a watch for that routeName.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotRoute, err := fakeClient.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchRDS failed with error: %v", err)
}
if gotRoute != route1 {
t.Fatalf("xdsClient.WatchRDS called for route: %v, want %v", gotRoute, route1)
}
rdsUpdate := xdsclient.RouteConfigUpdate{}
// Invoke callback with the xds client with a certain route update. Due to
// this route update updating every route name that rds handler handles,
// this should write to the update channel to send to the listener.
fakeClient.InvokeWatchRouteConfigCallback(route1, rdsUpdate, nil)
rhuWant := map[string]xdsclient.RouteConfigUpdate{route1: rdsUpdate}
select {
case rhu := <-ch:
if diff := cmp.Diff(rhu.updates, rhuWant); diff != "" {
t.Fatalf("got unexpected route update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}
// Close the rds handler. This is meant to be called when the lis wrapper is
// closed, and the call should cancel all the watches present (for this
// test, a single watch).
rh.close()
routeNameDeleted, err := fakeClient.WaitForCancelRouteConfigWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelRDS failed with error: %v", err)
}
if routeNameDeleted != route1 {
t.Fatalf("xdsClient.CancelRDS called for route %v, want %v", routeNameDeleted, route1)
}
}
// TestSuccessCaseTwoUpdates tests the case where the rds handler receives an
// update with a single Route, then receives a second update with two routes.
// The handler should start a watch for the added route, and if received a RDS
// update for that route it should send an update with both RDS updates present.
func (s) TestSuccessCaseTwoUpdates(t *testing.T) {
rh, fakeClient, ch := setupTests()
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotRoute, err := fakeClient.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchRDS failed with error: %v", err)
}
if gotRoute != route1 {
t.Fatalf("xdsClient.WatchRDS called for route: %v, want %v", gotRoute, route1)
}
// Update the RDSHandler with route names which adds a route name to watch.
// This should trigger the RDSHandler to start a watch for the added route
// name to watch.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
gotRoute, err = fakeClient.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchRDS failed with error: %v", err)
}
if gotRoute != route2 {
t.Fatalf("xdsClient.WatchRDS called for route: %v, want %v", gotRoute, route2)
}
// Invoke the callback with an update for route 1. This shouldn't cause the
// handler to write an update, as it has not received RouteConfigurations
// for every RouteName.
rdsUpdate1 := xdsclient.RouteConfigUpdate{}
fakeClient.InvokeWatchRouteConfigCallback(route1, rdsUpdate1, nil)
// The RDS Handler should not send an update.
sCtx, sCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-ch:
t.Fatal("RDS Handler wrote an update to updateChannel when it shouldn't have, as each route name has not received an update yet")
case <-sCtx.Done():
}
// Invoke the callback with an update for route 2. This should cause the
// handler to write an update, as it has received RouteConfigurations for
// every RouteName.
rdsUpdate2 := xdsclient.RouteConfigUpdate{}
fakeClient.InvokeWatchRouteConfigCallback(route2, rdsUpdate2, nil)
// The RDS Handler should then update the listener wrapper with an update
// with two route configurations, as both route names the RDS Handler handles
// have received an update.
rhuWant := map[string]xdsclient.RouteConfigUpdate{route1: rdsUpdate1, route2: rdsUpdate2}
select {
case rhu := <-ch:
if diff := cmp.Diff(rhu.updates, rhuWant); diff != "" {
t.Fatalf("got unexpected route update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the rds handler update to be written to the update buffer.")
}
// Close the rds handler. This is meant to be called when the lis wrapper is
// closed, and the call should cancel all the watches present (for this
// test, two watches on route1 and route2).
rh.close()
if err = waitForFuncWithNames(ctx, fakeClient.WaitForCancelRouteConfigWatch, route1, route2); err != nil {
t.Fatalf("Error while waiting for names: %v", err)
}
}
// TestSuccessCaseDeletedRoute tests the case where the rds handler receives an
// update with two routes, then receives an update with only one route. The RDS
// Handler is expected to cancel the watch for the route no longer present.
func (s) TestSuccessCaseDeletedRoute(t *testing.T) {
rh, fakeClient, ch := setupTests()
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Will start two watches.
if err := waitForFuncWithNames(ctx, fakeClient.WaitForWatchRouteConfig, route1, route2); err != nil {
t.Fatalf("Error while waiting for names: %v", err)
}
// Update the RDSHandler with route names which deletes a route name to
// watch. This should trigger the RDSHandler to cancel the watch for the
// deleted route name to watch.
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
// This should delete the watch for route2.
routeNameDeleted, err := fakeClient.WaitForCancelRouteConfigWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelRDS failed with error %v", err)
}
if routeNameDeleted != route2 {
t.Fatalf("xdsClient.CancelRDS called for route %v, want %v", routeNameDeleted, route2)
}
rdsUpdate := xdsclient.RouteConfigUpdate{}
// Invoke callback with the xds client with a certain route update. Due to
// this route update updating every route name that rds handler handles,
// this should write to the update channel to send to the listener.
fakeClient.InvokeWatchRouteConfigCallback(route1, rdsUpdate, nil)
rhuWant := map[string]xdsclient.RouteConfigUpdate{route1: rdsUpdate}
select {
case rhu := <-ch:
if diff := cmp.Diff(rhu.updates, rhuWant); diff != "" {
t.Fatalf("got unexpected route update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel.")
}
rh.close()
routeNameDeleted, err = fakeClient.WaitForCancelRouteConfigWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelRDS failed with error: %v", err)
}
if routeNameDeleted != route1 {
t.Fatalf("xdsClient.CancelRDS called for route %v, want %v", routeNameDeleted, route1)
}
}
// TestSuccessCaseTwoUpdatesAddAndDeleteRoute tests the case where the rds
// handler receives an update with two routes, and then receives an update with
// two routes, one previously there and one added (i.e. 12 -> 23). This should
// cause the route that is no longer there to be deleted and cancelled, and the
// route that was added should have a watch started for it.
func (s) TestSuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) {
rh, fakeClient, ch := setupTests()
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := waitForFuncWithNames(ctx, fakeClient.WaitForWatchRouteConfig, route1, route2); err != nil {
t.Fatalf("Error while waiting for names: %v", err)
}
// Update the rds handler with two routes, one which was already there and a new route.
// This should cause the rds handler to delete/cancel watch for route 1 and start a watch
// for route 3.
rh.updateRouteNamesToWatch(map[string]bool{route2: true, route3: true})
// Start watch comes first, which should be for route3 as was just added.
gotRoute, err := fakeClient.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchRDS failed with error: %v", err)
}
if gotRoute != route3 {
t.Fatalf("xdsClient.WatchRDS called for route: %v, want %v", gotRoute, route3)
}
// Then route 1 should be deleted/cancelled watch for, as it is no longer present
// in the new RouteName to watch map.
routeNameDeleted, err := fakeClient.WaitForCancelRouteConfigWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelRDS failed with error: %v", err)
}
if routeNameDeleted != route1 {
t.Fatalf("xdsClient.CancelRDS called for route %v, want %v", routeNameDeleted, route1)
}
// Invoke the callback with an update for route 2. This shouldn't cause the
// handler to write an update, as it has not received RouteConfigurations
// for every RouteName.
rdsUpdate2 := xdsclient.RouteConfigUpdate{}
fakeClient.InvokeWatchRouteConfigCallback(route2, rdsUpdate2, nil)
// The RDS Handler should not send an update.
sCtx, sCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-ch:
t.Fatalf("RDS Handler wrote an update to updateChannel when it shouldn't have, as each route name has not received an update yet")
case <-sCtx.Done():
}
// Invoke the callback with an update for route 3. This should cause the
// handler to write an update, as it has received RouteConfigurations for
// every RouteName.
rdsUpdate3 := xdsclient.RouteConfigUpdate{}
fakeClient.InvokeWatchRouteConfigCallback(route3, rdsUpdate3, nil)
// The RDS Handler should then update the listener wrapper with an update
// with two route configurations, as both route names the RDS Handler handles
// have received an update.
rhuWant := map[string]xdsclient.RouteConfigUpdate{route2: rdsUpdate2, route3: rdsUpdate3}
select {
case rhu := <-rh.updateChannel:
if diff := cmp.Diff(rhu.updates, rhuWant); diff != "" {
t.Fatalf("got unexpected route update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the rds handler update to be written to the update buffer.")
}
// Close the rds handler. This is meant to be called when the lis wrapper is
// closed, and the call should cancel all the watches present (for this
// test, two watches on route2 and route3).
rh.close()
if err = waitForFuncWithNames(ctx, fakeClient.WaitForCancelRouteConfigWatch, route2, route3); err != nil {
t.Fatalf("Error while waiting for names: %v", err)
}
}
// TestSuccessCaseSecondUpdateMakesRouteFull tests the scenario where the rds handler gets
// told to watch three rds configurations, gets two successful updates, then gets told to watch
// only those two. The rds handler should then write an update to update buffer.
func (s) TestSuccessCaseSecondUpdateMakesRouteFull(t *testing.T) {
rh, fakeClient, ch := setupTests()
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := waitForFuncWithNames(ctx, fakeClient.WaitForWatchRouteConfig, route1, route2, route3); err != nil {
t.Fatalf("Error while waiting for names: %v", err)
}
// Invoke the callbacks for two of the three watches. Since RDS is not full,
// this shouldn't trigger rds handler to write an update to update buffer.
fakeClient.InvokeWatchRouteConfigCallback(route1, xdsclient.RouteConfigUpdate{}, nil)
fakeClient.InvokeWatchRouteConfigCallback(route2, xdsclient.RouteConfigUpdate{}, nil)
// The RDS Handler should not send an update.
sCtx, sCtxCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
defer sCtxCancel()
select {
case <-rh.updateChannel:
t.Fatalf("RDS Handler wrote an update to updateChannel when it shouldn't have, as each route name has not received an update yet")
case <-sCtx.Done():
}
// Tell the rds handler to now only watch Route 1 and Route 2. This should
// trigger the rds handler to write an update to the update buffer as it now
// has full rds configuration.
rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true})
// Route 3 should be deleted/cancelled watch for, as it is no longer present
// in the new RouteName to watch map.
routeNameDeleted, err := fakeClient.WaitForCancelRouteConfigWatch(ctx)
if err != nil {
t.Fatalf("xdsClient.CancelRDS failed with error: %v", err)
}
if routeNameDeleted != route3 {
t.Fatalf("xdsClient.CancelRDS called for route %v, want %v", routeNameDeleted, route1)
}
rhuWant := map[string]xdsclient.RouteConfigUpdate{route1: {}, route2: {}}
select {
case rhu := <-ch:
if diff := cmp.Diff(rhu.updates, rhuWant); diff != "" {
t.Fatalf("got unexpected route update, diff (-got, +want): %v", diff)
}
case <-ctx.Done():
t.Fatal("Timed out waiting for the rds handler update to be written to the update buffer.")
}
}
// TestErrorReceived tests the case where the rds handler receives a route name
// to watch, then receives an update with an error. This error should be then
// written to the update channel.
func (s) TestErrorReceived(t *testing.T) {
rh, fakeClient, ch := setupTests()
rh.updateRouteNamesToWatch(map[string]bool{route1: true})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotRoute, err := fakeClient.WaitForWatchRouteConfig(ctx)
if err != nil {
t.Fatalf("xdsClient.WatchRDS failed with error %v", err)
}
if gotRoute != route1 {
t.Fatalf("xdsClient.WatchRDS called for route: %v, want %v", gotRoute, route1)
}
rdsErr := errors.New("some error")
fakeClient.InvokeWatchRouteConfigCallback(route1, xdsclient.RouteConfigUpdate{}, rdsErr)
select {
case rhu := <-ch:
if rhu.err.Error() != "some error" {
t.Fatalf("Did not receive the expected error, instead received: %v", rhu.err.Error())
}
case <-ctx.Done():
t.Fatal("Timed out waiting for update from update channel")
}
}

View File

@ -52,7 +52,7 @@ type Client struct {
bootstrapCfg *bootstrap.Config
ldsCb func(xdsclient.ListenerUpdate, error)
rdsCb func(xdsclient.RouteConfigUpdate, error)
rdsCbs map[string]func(xdsclient.RouteConfigUpdate, error)
cdsCbs map[string]func(xdsclient.ClusterUpdate, error)
edsCbs map[string]func(xdsclient.EndpointsUpdate, error)
@ -95,10 +95,10 @@ func (xdsC *Client) WaitForCancelListenerWatch(ctx context.Context) error {
// WatchRouteConfig registers a RDS watch.
func (xdsC *Client) WatchRouteConfig(routeName string, callback func(xdsclient.RouteConfigUpdate, error)) func() {
xdsC.rdsCb = callback
xdsC.rdsCbs[routeName] = callback
xdsC.rdsWatchCh.Send(routeName)
return func() {
xdsC.rdsCancelCh.Send(nil)
xdsC.rdsCancelCh.Send(routeName)
}
}
@ -116,15 +116,28 @@ func (xdsC *Client) WaitForWatchRouteConfig(ctx context.Context) (string, error)
//
// Not thread safe with WatchRouteConfig. Only call this after
// WaitForWatchRouteConfig.
func (xdsC *Client) InvokeWatchRouteConfigCallback(update xdsclient.RouteConfigUpdate, err error) {
xdsC.rdsCb(update, err)
func (xdsC *Client) InvokeWatchRouteConfigCallback(name string, update xdsclient.RouteConfigUpdate, err error) {
if len(xdsC.rdsCbs) != 1 {
xdsC.rdsCbs[name](update, err)
return
}
// Keeps functionality with previous usage of this on client side, if single
// callback call that callback.
var routeName string
for route := range xdsC.rdsCbs {
routeName = route
}
xdsC.rdsCbs[routeName](update, err)
}
// WaitForCancelRouteConfigWatch waits for a RDS watch to be cancelled and returns
// context.DeadlineExceeded otherwise.
func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) error {
_, err := xdsC.rdsCancelCh.Receive(ctx)
return err
func (xdsC *Client) WaitForCancelRouteConfigWatch(ctx context.Context) (string, error) {
val, err := xdsC.rdsCancelCh.Receive(ctx)
if err != nil {
return "", err
}
return val.(string), err
}
// WatchCluster registers a CDS watch.
@ -293,16 +306,17 @@ func NewClientWithName(name string) *Client {
return &Client{
name: name,
ldsWatchCh: testutils.NewChannel(),
rdsWatchCh: testutils.NewChannel(),
rdsWatchCh: testutils.NewChannelWithSize(10),
cdsWatchCh: testutils.NewChannelWithSize(10),
edsWatchCh: testutils.NewChannelWithSize(10),
ldsCancelCh: testutils.NewChannel(),
rdsCancelCh: testutils.NewChannel(),
rdsCancelCh: testutils.NewChannelWithSize(10),
cdsCancelCh: testutils.NewChannelWithSize(10),
edsCancelCh: testutils.NewChannelWithSize(10),
loadReportCh: testutils.NewChannel(),
lrsCancelCh: testutils.NewChannel(),
loadStore: load.NewStore(),
rdsCbs: make(map[string]func(xdsclient.RouteConfigUpdate, error)),
cdsCbs: make(map[string]func(xdsclient.ClusterUpdate, error)),
edsCbs: make(map[string]func(xdsclient.EndpointsUpdate, error)),
Closed: grpcsync.NewEvent(),

View File

@ -251,7 +251,6 @@ type InboundListenerConfig struct {
// of interest to the registered RDS watcher.
type RouteConfigUpdate struct {
VirtualHosts []*VirtualHost
// Raw is the resource from the xds response.
Raw *anypb.Any
}

View File

@ -233,6 +233,11 @@ func (s *GRPCServer) Serve(lis net.Listener) error {
err: err,
})
},
DrainCallback: func(addr net.Addr) {
if gs, ok := s.gs.(*grpc.Server); ok {
drainServerTransports(gs, addr.String())
}
},
})
// Block until a good LDS response is received or the server is stopped.

View File

@ -35,6 +35,7 @@ import (
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
v3httppb "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/network/http_connection_manager/v3"
v3tlspb "github.com/envoyproxy/go-control-plane/envoy/extensions/transport_sockets/tls/v3"
wrapperspb "github.com/golang/protobuf/ptypes/wrappers"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/credentials/tls/certprovider"
@ -53,6 +54,67 @@ const (
testServerListenerResourceNameTemplate = "/path/to/resource/%s/%s"
)
var listenerWithFilterChains = &v3listenerpb.Listener{
FilterChains: []*v3listenerpb.FilterChain{
{
FilterChainMatch: &v3listenerpb.FilterChainMatch{
PrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourceType: v3listenerpb.FilterChainMatch_SAME_IP_OR_LOOPBACK,
SourcePrefixRanges: []*v3corepb.CidrRange{
{
AddressPrefix: "192.168.0.0",
PrefixLen: &wrapperspb.UInt32Value{
Value: uint32(16),
},
},
},
SourcePorts: []uint32{80},
},
TransportSocket: &v3corepb.TransportSocket{
Name: "envoy.transport_sockets.tls",
ConfigType: &v3corepb.TransportSocket_TypedConfig{
TypedConfig: testutils.MarshalAny(&v3tlspb.DownstreamTlsContext{
CommonTlsContext: &v3tlspb.CommonTlsContext{
TlsCertificateCertificateProviderInstance: &v3tlspb.CommonTlsContext_CertificateProviderInstance{
InstanceName: "identityPluginInstance",
CertificateName: "identityCertName",
},
},
}),
},
},
Filters: []*v3listenerpb.Filter{
{
Name: "filter-1",
ConfigType: &v3listenerpb.Filter_TypedConfig{
TypedConfig: testutils.MarshalAny(&v3httppb.HttpConnectionManager{
RouteSpecifier: &v3httppb.HttpConnectionManager_RouteConfig{
RouteConfig: &v3routepb.RouteConfiguration{
Name: "routeName",
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{"lds.target.good:3333"},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{
PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"},
},
Action: &v3routepb.Route_NonForwardingAction{},
}}}}},
},
}),
},
},
},
},
},
}
type s struct {
grpctest.Tester
}
@ -73,9 +135,10 @@ func (f *fakeGRPCServer) RegisterService(*grpc.ServiceDesc, interface{}) {
f.registerServiceCh.Send(nil)
}
func (f *fakeGRPCServer) Serve(net.Listener) error {
func (f *fakeGRPCServer) Serve(lis net.Listener) error {
f.serveCh.Send(nil)
<-f.done
lis.Close()
return nil
}
@ -377,12 +440,17 @@ func (s) TestServeSuccess(t *testing.T) {
// Push a good LDS response, and wait for Serve() to be invoked on the
// underlying grpc.Server.
fcm, err := xdsclient.NewFilterChainManager(listenerWithFilterChains)
if err != nil {
t.Fatalf("xdsclient.NewFilterChainManager() failed with error: %v", err)
}
addr, port := splitHostPort(lis.Addr().String())
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
RouteConfigName: "routeconfig",
InboundListenerCfg: &xdsclient.InboundListenerConfig{
Address: addr,
Port: port,
Address: addr,
Port: port,
FilterChains: fcm,
},
}, nil)
if _, err := fs.serveCh.Receive(ctx); err != nil {
@ -404,8 +472,9 @@ func (s) TestServeSuccess(t *testing.T) {
client.InvokeWatchListenerCallback(xdsclient.ListenerUpdate{
RouteConfigName: "routeconfig",
InboundListenerCfg: &xdsclient.InboundListenerConfig{
Address: "10.20.30.40",
Port: "666",
Address: "10.20.30.40",
Port: "666",
FilterChains: fcm,
},
}, nil)
sCtx, sCancel = context.WithTimeout(context.Background(), defaultTestShortTimeout)