diff --git a/xds/internal/server/listener_wrapper.go b/xds/internal/server/listener_wrapper.go index 9f5b2ecaf..94598df80 100644 --- a/xds/internal/server/listener_wrapper.go +++ b/xds/internal/server/listener_wrapper.go @@ -69,8 +69,7 @@ type DrainCallback func(addr net.Addr) // XDSClient wraps the methods on the XDSClient which are required by // the listenerWrapper. type XDSClient interface { - WatchListener(string, func(xdsresource.ListenerUpdate, error)) func() - WatchRouteConfig(string, func(xdsresource.RouteConfigUpdate, error)) func() + WatchResource(rType xdsresource.Type, resourceName string, watcher xdsresource.ResourceWatcher) (cancel func()) BootstrapConfig() *bootstrap.Config } @@ -110,7 +109,6 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru mode: connectivity.ServingModeStarting, closed: grpcsync.NewEvent(), goodUpdate: grpcsync.NewEvent(), - ldsUpdateCh: make(chan ldsUpdateWithError, 1), rdsUpdateCh: make(chan rdsHandlerUpdate, 1), } lw.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[xds-server-listener %p] ", lw)) @@ -120,17 +118,16 @@ func NewListenerWrapper(params ListenerWrapperParams) (net.Listener, <-chan stru lisAddr := lw.Listener.Addr().String() lw.addr, lw.port, _ = net.SplitHostPort(lisAddr) - lw.rdsHandler = newRDSHandler(lw.xdsC, lw.rdsUpdateCh) - lw.cancelWatch = lw.xdsC.WatchListener(lw.name, lw.handleListenerUpdate) + lw.rdsHandler = newRDSHandler(lw.xdsC, lw.logger, lw.rdsUpdateCh) + lw.cancelWatch = xdsresource.WatchListener(lw.xdsC, lw.name, &ldsWatcher{ + parent: lw, + logger: lw.logger, + name: lw.name, + }) go lw.run() return lw, lw.goodUpdate.Done() } -type ldsUpdateWithError struct { - update xdsresource.ListenerUpdate - err error -} - // listenerWrapper wraps the net.Listener associated with the listening address // passed to Serve(). It also contains all other state associated with this // particular invocation of Serve(). @@ -181,8 +178,6 @@ type listenerWrapper struct { // rdsUpdates are the RDS resources received from the management // server, keyed on the RouteName of the RDS resource. rdsUpdates unsafe.Pointer // map[string]xdsclient.RouteConfigUpdate - // ldsUpdateCh is a channel for XDSClient LDS updates. - ldsUpdateCh chan ldsUpdateWithError // rdsUpdateCh is a channel for XDSClient RDS updates. rdsUpdateCh chan rdsHandlerUpdate } @@ -320,31 +315,12 @@ func (l *listenerWrapper) run() { select { case <-l.closed.Done(): return - case u := <-l.ldsUpdateCh: - l.handleLDSUpdate(u) case u := <-l.rdsUpdateCh: l.handleRDSUpdate(u) } } } -// handleLDSUpdate is the callback which handles LDS Updates. It writes the -// received update to the update channel, which is picked up by the run -// goroutine. -func (l *listenerWrapper) handleListenerUpdate(update xdsresource.ListenerUpdate, err error) { - if l.closed.HasFired() { - l.logger.Warningf("Resource %q received update: %v with error: %v, after listener was closed", l.name, update, err) - return - } - // Remove any existing entry in ldsUpdateCh and replace with the new one, as the only update - // listener cares about is most recent update. - select { - case <-l.ldsUpdateCh: - default: - } - l.ldsUpdateCh <- ldsUpdateWithError{update: update, err: err} -} - // handleRDSUpdate handles a full rds update from rds handler. On a successful // update, the server will switch to ServingModeServing as the full // configuration (both LDS and RDS) has been received. @@ -354,7 +330,6 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { return } if update.err != nil { - l.logger.Warningf("Received error for rds names specified in resource %q: %+v", l.name, update.err) if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { l.switchMode(nil, connectivity.ServingModeNotServing, update.err) } @@ -368,17 +343,7 @@ func (l *listenerWrapper) handleRDSUpdate(update rdsHandlerUpdate) { l.goodUpdate.Fire() } -func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { - if update.err != nil { - l.logger.Warningf("Received error for resource %q: %+v", l.name, update.err) - if xdsresource.ErrType(update.err) == xdsresource.ErrorTypeResourceNotFound { - l.switchMode(nil, connectivity.ServingModeNotServing, update.err) - } - // For errors which are anything other than "resource-not-found", we - // continue to use the old configuration. - return - } - +func (l *listenerWrapper) handleLDSUpdate(update xdsresource.ListenerUpdate) { // Make sure that the socket address on the received Listener resource // matches the address of the net.Listener passed to us by the user. This // check is done here instead of at the XDSClient layer because of the @@ -391,7 +356,7 @@ func (l *listenerWrapper) handleLDSUpdate(update ldsUpdateWithError) { // What this means is that the XDSClient has ACKed a resource which can push // the server into a "not serving" mode. This is not ideal, but this is // what we have decided to do. See gRPC A36 for more details. - ilc := update.update.InboundListenerCfg + ilc := update.InboundListenerCfg if ilc.Address != l.addr || ilc.Port != l.port { l.switchMode(nil, connectivity.ServingModeNotServing, fmt.Errorf("address (%s:%s) in Listener update does not match listening address: (%s:%s)", ilc.Address, ilc.Port, l.addr, l.port)) return @@ -440,3 +405,46 @@ func (l *listenerWrapper) switchMode(fcs *xdsresource.FilterChainManager, newMod l.modeCallback(l.Listener.Addr(), newMode, err) } } + +// ldsWatcher implements the xdsresource.ListenerWatcher interface and is +// passed to the WatchListener API. +type ldsWatcher struct { + parent *listenerWrapper + logger *internalgrpclog.PrefixLogger + name string +} + +func (lw *ldsWatcher) OnUpdate(update *xdsresource.ListenerResourceData) { + if lw.parent.closed.HasFired() { + lw.logger.Warningf("Resource %q received update: %#v after listener was closed", lw.name, update) + return + } + if lw.logger.V(2) { + lw.logger.Infof("LDS watch for resource %q received update: %#v", lw.name, update.Resource) + } + lw.parent.handleLDSUpdate(update.Resource) +} + +func (lw *ldsWatcher) OnError(err error) { + if lw.parent.closed.HasFired() { + lw.logger.Warningf("Resource %q received error: %v after listener was closed", lw.name, err) + return + } + if lw.logger.V(2) { + lw.logger.Infof("LDS watch for resource %q reported error: %#v", lw.name, err) + } + // For errors which are anything other than "resource-not-found", we + // continue to use the old configuration. +} + +func (lw *ldsWatcher) OnResourceDoesNotExist() { + if lw.parent.closed.HasFired() { + lw.logger.Warningf("Resource %q received resource-not-found-error after listener was closed", lw.name) + return + } + if lw.logger.V(2) { + lw.logger.Infof("LDS watch for resource %q reported resource-does-not-exist error: %v", lw.name) + } + err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type Listener not found in received response", lw.name) + lw.parent.switchMode(nil, connectivity.ServingModeNotServing, err) +} diff --git a/xds/internal/server/rds_handler.go b/xds/internal/server/rds_handler.go index 722748cbd..3dcc2096a 100644 --- a/xds/internal/server/rds_handler.go +++ b/xds/internal/server/rds_handler.go @@ -21,6 +21,7 @@ package server import ( "sync" + igrpclog "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/xds/internal/xdsclient/xdsresource" ) @@ -34,7 +35,8 @@ type rdsHandlerUpdate struct { // rdsHandler handles any RDS queries that need to be started for a given server // side listeners Filter Chains (i.e. not inline). type rdsHandler struct { - xdsC XDSClient + xdsC XDSClient + logger *igrpclog.PrefixLogger mu sync.Mutex updates map[string]xdsresource.RouteConfigUpdate @@ -49,9 +51,10 @@ type rdsHandler struct { // newRDSHandler creates a new rdsHandler to watch for RDS resources. // listenerWrapper updates the list of route names to watch by calling // updateRouteNamesToWatch() upon receipt of new Listener configuration. -func newRDSHandler(xdsC XDSClient, ch chan rdsHandlerUpdate) *rdsHandler { +func newRDSHandler(xdsC XDSClient, logger *igrpclog.PrefixLogger, ch chan rdsHandlerUpdate) *rdsHandler { return &rdsHandler{ xdsC: xdsC, + logger: logger, updateChannel: ch, updates: make(map[string]xdsresource.RouteConfigUpdate), cancels: make(map[string]func()), @@ -69,11 +72,11 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) // routeNamesToWatch. for routeName := range routeNamesToWatch { if _, ok := rh.cancels[routeName]; !ok { - func(routeName string) { - rh.cancels[routeName] = rh.xdsC.WatchRouteConfig(routeName, func(update xdsresource.RouteConfigUpdate, err error) { - rh.handleRouteUpdate(routeName, update, err) - }) - }(routeName) + // The xDS client keeps a reference to the watcher until the cancel + // func is invoked. So, we don't need to keep a reference for fear + // of it being garbage collected. + w := &rdsWatcher{parent: rh, routeName: routeName} + rh.cancels[routeName] = xdsresource.WatchRouteConfig(rh.xdsC, routeName, w) } } @@ -97,11 +100,7 @@ func (rh *rdsHandler) updateRouteNamesToWatch(routeNamesToWatch map[string]bool) // handleRouteUpdate persists the route config for a given route name, and also // sends an update to the Listener Wrapper on an error received or if the rds // handler has a full collection of updates. -func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate, err error) { - if err != nil { - drainAndPush(rh.updateChannel, rdsHandlerUpdate{err: err}) - return - } +func (rh *rdsHandler) handleRouteUpdate(routeName string, update xdsresource.RouteConfigUpdate) { rh.mu.Lock() defer rh.mu.Unlock() rh.updates[routeName] = update @@ -131,3 +130,33 @@ func (rh *rdsHandler) close() { cancel() } } + +// rdsWatcher implements the xdsresource.RouteConfigWatcher interface and is +// passed to the WatchRouteConfig API. +type rdsWatcher struct { + parent *rdsHandler + logger *igrpclog.PrefixLogger + routeName string +} + +func (rw *rdsWatcher) OnUpdate(update *xdsresource.RouteConfigResourceData) { + if rw.logger.V(2) { + rw.logger.Infof("RDS watch for resource %q received update: %#v", rw.routeName, update.Resource) + } + rw.parent.handleRouteUpdate(rw.routeName, update.Resource) +} + +func (rw *rdsWatcher) OnError(err error) { + if rw.logger.V(2) { + rw.logger.Infof("RDS watch for resource %q reported error: %v", rw.routeName, err) + } + drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err}) +} + +func (rw *rdsWatcher) OnResourceDoesNotExist() { + if rw.logger.V(2) { + rw.logger.Infof("RDS watch for resource %q reported resource-does-not-exist error: %v", rw.routeName) + } + err := xdsresource.NewErrorf(xdsresource.ErrorTypeResourceNotFound, "resource name %q of type RouteConfiguration not found in received response", rw.routeName) + drainAndPush(rw.parent.updateChannel, rdsHandlerUpdate{err: err}) +} diff --git a/xds/internal/server/rds_handler_test.go b/xds/internal/server/rds_handler_test.go index b8b2c4ebc..e7fec1953 100644 --- a/xds/internal/server/rds_handler_test.go +++ b/xds/internal/server/rds_handler_test.go @@ -187,7 +187,7 @@ func (s) TestRDSHandler_SuccessCaseOneRDSWatch(t *testing.T) { // Create an rds handler and give it a single route to watch. updateCh := make(chan rdsHandlerUpdate, 1) - rh := newRDSHandler(xdsC, updateCh) + rh := newRDSHandler(xdsC, nil, updateCh) rh.updateRouteNamesToWatch(map[string]bool{route1: true}) // Verify that the given route is requested. @@ -211,7 +211,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdates(t *testing.T) { // Create an rds handler and give it a single route to watch. updateCh := make(chan rdsHandlerUpdate, 1) - rh := newRDSHandler(xdsC, updateCh) + rh := newRDSHandler(xdsC, nil, updateCh) rh.updateRouteNamesToWatch(map[string]bool{route1: true}) // Verify that the given route is requested. @@ -273,7 +273,7 @@ func (s) TestRDSHandler_SuccessCaseDeletedRoute(t *testing.T) { // Create an rds handler and give it two routes to watch. updateCh := make(chan rdsHandlerUpdate, 1) - rh := newRDSHandler(xdsC, updateCh) + rh := newRDSHandler(xdsC, nil, updateCh) rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true}) // Verify that the given routes are requested. @@ -329,7 +329,7 @@ func (s) TestRDSHandler_SuccessCaseTwoUpdatesAddAndDeleteRoute(t *testing.T) { // Create an rds handler and give it two routes to watch. updateCh := make(chan rdsHandlerUpdate, 1) - rh := newRDSHandler(xdsC, updateCh) + rh := newRDSHandler(xdsC, nil, updateCh) rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true}) // Verify that the given routes are requested. @@ -400,7 +400,7 @@ func (s) TestRDSHandler_SuccessCaseSecondUpdateMakesRouteFull(t *testing.T) { // Create an rds handler and give it three routes to watch. updateCh := make(chan rdsHandlerUpdate, 1) - rh := newRDSHandler(xdsC, updateCh) + rh := newRDSHandler(xdsC, nil, updateCh) rh.updateRouteNamesToWatch(map[string]bool{route1: true, route2: true, route3: true}) // Verify that the given routes are requested. @@ -455,7 +455,7 @@ func (s) TestErrorReceived(t *testing.T) { // Create an rds handler and give it a single route to watch. updateCh := make(chan rdsHandlerUpdate, 1) - rh := newRDSHandler(xdsC, updateCh) + rh := newRDSHandler(xdsC, nil, updateCh) rh.updateRouteNamesToWatch(map[string]bool{route1: true}) // Verify that the given route is requested.