diff --git a/xds/internal/clients/internal/testutils/channel.go b/xds/internal/clients/internal/testutils/channel.go index 509adeae0..ac77d559f 100644 --- a/xds/internal/clients/internal/testutils/channel.go +++ b/xds/internal/clients/internal/testutils/channel.go @@ -59,6 +59,28 @@ func (c *Channel) Replace(value any) { } } +// SendContext sends value on the underlying channel, or returns an error if +// the context expires. +func (c *Channel) SendContext(ctx context.Context, value any) error { + select { + case c.C <- value: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// Drain drains the channel by repeatedly reading from it until it is empty. +func (c *Channel) Drain() { + for { + select { + case <-c.C: + default: + return + } + } +} + // NewChannelWithSize returns a new Channel with a buffer of bufSize. func NewChannelWithSize(bufSize int) *Channel { return &Channel{C: make(chan any, bufSize)} diff --git a/xds/internal/clients/xdsclient/ads_stream.go b/xds/internal/clients/xdsclient/ads_stream.go index eb371287d..774f8ab24 100644 --- a/xds/internal/clients/xdsclient/ads_stream.go +++ b/xds/internal/clients/xdsclient/ads_stream.go @@ -73,39 +73,13 @@ type adsStreamEventHandler interface { onResponse(response, func()) ([]string, error) // Called when a response is received on the ADS stream. } -// watchState is a enum that describes the watch state of a particular -// resource. -type watchState int - -const ( - // resourceWatchStateStarted is the state where a watch for a resource was - // started, but a request asking for that resource is yet to be sent to the - // management server. - resourceWatchStateStarted watchState = iota - // resourceWatchStateRequested is the state when a request has been sent for - // the resource being watched. - resourceWatchStateRequested - // ResourceWatchStateReceived is the state when a response has been received - // for the resource being watched. - resourceWatchStateReceived - // resourceWatchStateTimeout is the state when the watch timer associated - // with the resource expired because no response was received. - resourceWatchStateTimeout -) - -// resourceWatchState is the state corresponding to a resource being watched. -type resourceWatchState struct { - State watchState // Watch state of the resource. - ExpiryTimer *time.Timer // Timer for the expiry of the watch. -} - // state corresponding to a resource type. type resourceTypeState struct { - version string // Last acked version. Should not be reset when the stream breaks. - nonce string // Last received nonce. Should be reset when the stream breaks. - bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked. - subscribedResources map[string]*resourceWatchState // Map of subscribed resource names to their state. - pendingWrite bool // True if there is a pending write for this resource type. + version string // Last acked version. Should not be reset when the stream breaks. + nonce string // Last received nonce. Should be reset when the stream breaks. + bufferedRequests chan struct{} // Channel to buffer requests when writing is blocked. + subscribedResources map[string]*xdsresource.ResourceWatchState // Map of subscribed resource names to their state. + pendingWrite bool // True if there is a pending write for this resource type. } // adsStreamImpl provides the functionality associated with an ADS (Aggregated @@ -198,7 +172,7 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) { // An entry in the type state map is created as part of the first // subscription request for this type. state = &resourceTypeState{ - subscribedResources: make(map[string]*resourceWatchState), + subscribedResources: make(map[string]*xdsresource.ResourceWatchState), bufferedRequests: make(chan struct{}, 1), } s.resourceTypeState[typ] = state @@ -206,7 +180,7 @@ func (s *adsStreamImpl) subscribe(typ ResourceType, name string) { // Create state for the newly subscribed resource. The watch timer will // be started when a request for this resource is actually sent out. - state.subscribedResources[name] = &resourceWatchState{State: resourceWatchStateStarted} + state.subscribedResources[name] = &xdsresource.ResourceWatchState{State: xdsresource.ResourceWatchStateStarted} state.pendingWrite = true // Send a request for the resource type with updated subscriptions. @@ -616,8 +590,8 @@ func (s *adsStreamImpl) onRecv(stream clients.Stream, names []string, url, versi s.logger.Warningf("ADS stream received a response for resource %q, but no state exists for it", name) continue } - if ws := rs.State; ws == resourceWatchStateStarted || ws == resourceWatchStateRequested { - rs.State = resourceWatchStateReceived + if ws := rs.State; ws == xdsresource.ResourceWatchStateStarted || ws == xdsresource.ResourceWatchStateRequested { + rs.State = xdsresource.ResourceWatchStateReceived if rs.ExpiryTimer != nil { rs.ExpiryTimer.Stop() rs.ExpiryTimer = nil @@ -652,14 +626,14 @@ func (s *adsStreamImpl) onError(err error, msgReceived bool) { s.mu.Lock() for _, state := range s.resourceTypeState { for _, rs := range state.subscribedResources { - if rs.State != resourceWatchStateRequested { + if rs.State != xdsresource.ResourceWatchStateRequested { continue } if rs.ExpiryTimer != nil { rs.ExpiryTimer.Stop() rs.ExpiryTimer = nil } - rs.State = resourceWatchStateStarted + rs.State = xdsresource.ResourceWatchStateStarted } } s.mu.Unlock() @@ -691,15 +665,15 @@ func (s *adsStreamImpl) startWatchTimersLocked(typ ResourceType, names []string) if !ok { continue } - if resourceState.State != resourceWatchStateStarted { + if resourceState.State != xdsresource.ResourceWatchStateStarted { continue } - resourceState.State = resourceWatchStateRequested + resourceState.State = xdsresource.ResourceWatchStateRequested rs := resourceState resourceState.ExpiryTimer = time.AfterFunc(s.watchExpiryTimeout, func() { s.mu.Lock() - rs.State = resourceWatchStateTimeout + rs.State = xdsresource.ResourceWatchStateTimeout rs.ExpiryTimer = nil s.mu.Unlock() s.eventHandler.onWatchExpiry(typ, name) @@ -707,7 +681,22 @@ func (s *adsStreamImpl) startWatchTimersLocked(typ ResourceType, names []string) } } -func resourceNames(m map[string]*resourceWatchState) []string { +func (s *adsStreamImpl) adsResourceWatchStateForTesting(rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) { + s.mu.Lock() + defer s.mu.Unlock() + + state, ok := s.resourceTypeState[rType] + if !ok { + return xdsresource.ResourceWatchState{}, fmt.Errorf("unknown resource type: %v", rType) + } + resourceState, ok := state.subscribedResources[resourceName] + if !ok { + return xdsresource.ResourceWatchState{}, fmt.Errorf("unknown resource name: %v", resourceName) + } + return *resourceState, nil +} + +func resourceNames(m map[string]*xdsresource.ResourceWatchState) []string { ret := make([]string, len(m)) idx := 0 for name := range m { diff --git a/xds/internal/clients/xdsclient/internal/internal.go b/xds/internal/clients/xdsclient/internal/internal.go new file mode 100644 index 000000000..7adb67190 --- /dev/null +++ b/xds/internal/clients/xdsclient/internal/internal.go @@ -0,0 +1,36 @@ +/* + * + * Copyright 2025 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// Package internal contains functionality internal to the xdsclient package. +package internal + +import "time" + +var ( + // WatchExpiryTimeout is the watch expiry timeout for xDS client. It can be + // overridden by tests to change the default watch expiry timeout. + WatchExpiryTimeout time.Duration + + // StreamBackoff is the stream backoff for xDS client. It can be overridden + // by tests to change the default backoff strategy. + StreamBackoff func(int) time.Duration + + // ResourceWatchStateForTesting gets the watch state for the resource + // identified by the given resource type and resource name. Returns a + // non-nil error if there is no such resource being watched. + ResourceWatchStateForTesting any // func(*xdsclient.XDSClient, xdsclient.ResourceType, string) error +) diff --git a/xds/internal/clients/xdsclient/internal/xdsresource/ads_stream.go b/xds/internal/clients/xdsclient/internal/xdsresource/ads_stream.go new file mode 100644 index 000000000..87fe9ac86 --- /dev/null +++ b/xds/internal/clients/xdsclient/internal/xdsresource/ads_stream.go @@ -0,0 +1,46 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package xdsresource + +import "time" + +// WatchState is a enum that describes the watch state of a particular +// resource. +type WatchState int + +const ( + // ResourceWatchStateStarted is the state where a watch for a resource was + // started, but a request asking for that resource is yet to be sent to the + // management server. + ResourceWatchStateStarted WatchState = iota + // ResourceWatchStateRequested is the state when a request has been sent for + // the resource being watched. + ResourceWatchStateRequested + // ResourceWatchStateReceived is the state when a response has been received + // for the resource being watched. + ResourceWatchStateReceived + // ResourceWatchStateTimeout is the state when the watch timer associated + // with the resource expired because no response was received. + ResourceWatchStateTimeout +) + +// ResourceWatchState is the state corresponding to a resource being watched. +type ResourceWatchState struct { + State WatchState // Watch state of the resource. + ExpiryTimer *time.Timer // Timer for the expiry of the watch. +} diff --git a/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go b/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go new file mode 100644 index 000000000..467365d26 --- /dev/null +++ b/xds/internal/clients/xdsclient/test/ads_stream_ack_nack_test.go @@ -0,0 +1,508 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/uuid" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/xds/internal/clients" + "google.golang.org/grpc/xds/internal/clients/grpctransport" + "google.golang.org/grpc/xds/internal/clients/internal/testutils" + "google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e" + "google.golang.org/grpc/xds/internal/clients/xdsclient" + "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/protobuf/proto" + "google.golang.org/protobuf/testing/protocmp" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +// Creates an xDS client with the given management server address, node ID +// and transport builder. +func createXDSClient(t *testing.T, mgmtServerAddress string, nodeID string, transportBuilder clients.TransportBuilder) *xdsclient.XDSClient { + t.Helper() + + resourceTypes := map[string]xdsclient.ResourceType{xdsresource.V3ListenerURL: listenerType} + si := clients.ServerIdentifier{ + ServerURI: mgmtServerAddress, + Extensions: grpctransport.ServerIdentifierExtension{ConfigName: "insecure"}, + } + + xdsClientConfig := xdsclient.Config{ + Servers: []xdsclient.ServerConfig{{ServerIdentifier: si}}, + Node: clients.Node{ID: nodeID, UserAgentName: "user-agent", UserAgentVersion: "0.0.0.0"}, + TransportBuilder: transportBuilder, + ResourceTypes: resourceTypes, + // Xdstp resource names used in this test do not specify an + // authority. These will end up looking up an entry with the + // empty key in the authorities map. Having an entry with an + // empty key and empty configuration, results in these + // resources also using the top-level configuration. + Authorities: map[string]xdsclient.Authority{ + "": {XDSServers: []xdsclient.ServerConfig{}}, + }, + } + + // Create an xDS client with the above config. + client, err := xdsclient.New(xdsClientConfig) + if err != nil { + t.Fatalf("Failed to create xDS client: %v", err) + } + t.Cleanup(func() { client.Close() }) + return client +} + +// Tests simple ACK and NACK scenarios on the ADS stream: +// 1. When a good response is received, i.e. once that is expected to be ACKed, +// the test verifies that an ACK is sent matching the version and nonce from +// the response. +// 2. When a subsequent bad response is received, i.e. once is expected to be +// NACKed, the test verifies that a NACK is sent matching the previously +// ACKed version and current nonce from the response. +// 3. When a subsequent good response is received, the test verifies that an +// ACK is sent matching the version and nonce from the current response. +func (s) TestADS_ACK_NACK_Simple(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. Configure the + // request and response handlers to push on channels that are inspected by + // the test goroutine to verify ACK version and nonce. + streamRequestCh := testutils.NewChannelWithSize(1) + streamResponseCh := testutils.NewChannelWithSize(1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + streamRequestCh.SendContext(ctx, req) + return nil + }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + streamResponseCh.SendContext(ctx, resp) + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + listenerResource := e2e.DefaultClientListener(listenerName, routeConfigName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client pointing to the above server. + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "user-agent", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for a discovery response from the server") + } + gotResp := r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.VersionInfo = gotResp.GetVersionInfo() + wantReq.ResponseNonce = gotResp.GetNonce() + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + wantUpdate := listenerUpdateErrTuple{ + update: listenerUpdate{RouteConfigName: routeConfigName}, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Update the management server with a listener resource that contains an + // empty HTTP connection manager within the apiListener, which will cause + // the resource to be NACKed. + badListener := proto.Clone(listenerResource).(*v3listenerpb.Listener) + badListener.ApiListener.ApiListener = nil + mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{badListener}, + SkipValidation: true, + }) + + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for a discovery response from the server") + } + gotResp = r.(*v3discoverypb.DiscoveryResponse) + + wantNackErr := xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type") + if err := verifyListenerUpdate(ctx, lw.ambientErrCh, listenerUpdateErrTuple{ambientErr: wantNackErr}); err != nil { + t.Fatal(err) + } + + // Verify that the NACK contains the appropriate version, nonce and error. + // We expect the version to not change as this is a NACK. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for NACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + if gotNonce, wantNonce := gotReq.GetResponseNonce(), gotResp.GetNonce(); gotNonce != wantNonce { + t.Errorf("Unexpected nonce in discovery request, got: %v, want: %v", gotNonce, wantNonce) + } + if gotErr := gotReq.GetErrorDetail(); gotErr == nil || !strings.Contains(gotErr.GetMessage(), wantNackErr.Error()) { + t.Fatalf("Unexpected error in discovery request, got: %v, want: %v", gotErr.GetMessage(), wantNackErr) + } + + // Update the management server to send a good resource again. + mgmtServer.Update(ctx, e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + }) + + // The envoy-go-control-plane management server keeps resending the same + // resource as long as we keep NACK'ing it. So, we will see the bad resource + // sent to us a few times here, before receiving the good resource. + var lastErr error + for { + if ctx.Err() != nil { + t.Fatalf("Timeout when waiting for an ACK from the xDS client. Last seen error: %v", lastErr) + } + + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for a discovery response from the server") + } + gotResp = r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantReq.VersionInfo = gotResp.GetVersionInfo() + wantReq.ResponseNonce = gotResp.GetNonce() + wantReq.ErrorDetail = nil + diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()) + if diff == "" { + lastErr = nil + break + } + lastErr = fmt.Errorf("unexpected diff in discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + for ; ctx.Err() == nil; <-time.After(100 * time.Millisecond) { + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + lastErr = err + continue + } + break + } + if ctx.Err() != nil { + t.Fatalf("Timeout when waiting for listener update. Last seen error: %v", lastErr) + } +} + +// Tests the case where the first response is invalid. The test verifies that +// the NACK contains an empty version string. +func (s) TestADS_NACK_InvalidFirstResponse(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. Configure the + // request and response handlers to push on channels that are inspected by + // the test goroutine to verify ACK version and nonce. + streamRequestCh := testutils.NewChannelWithSize(1) + streamResponseCh := testutils.NewChannelWithSize(1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + streamRequestCh.SendContext(ctx, req) + return nil + }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + streamResponseCh.SendContext(ctx, resp) + }, + }) + + // Create a listener resource on the management server that is expected to + // be NACKed by the xDS client. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + listenerResource := e2e.DefaultClientListener(listenerName, routeConfigName) + listenerResource.ApiListener.ApiListener = nil + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client pointing to the above server. + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "user-agent", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp := r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the error is propagated to the watcher. + var wantNackErr = xdsresource.NewError(xdsresource.ErrorTypeNACKed, "unexpected http connection manager resource type") + if err := verifyListenerUpdate(ctx, lw.resourceErrCh, listenerUpdateErrTuple{resourceErr: wantNackErr}); err != nil { + t.Fatal(err) + } + + // NACK should contain the appropriate error, nonce, but empty version. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + if gotVersion, wantVersion := gotReq.GetVersionInfo(), ""; gotVersion != wantVersion { + t.Errorf("Unexpected version in discovery request, got: %v, want: %v", gotVersion, wantVersion) + } + if gotNonce, wantNonce := gotReq.GetResponseNonce(), gotResp.GetNonce(); gotNonce != wantNonce { + t.Errorf("Unexpected nonce in discovery request, got: %v, want: %v", gotNonce, wantNonce) + } + if gotErr := gotReq.GetErrorDetail(); gotErr == nil || !strings.Contains(gotErr.GetMessage(), wantNackErr.Error()) { + t.Fatalf("Unexpected error in discovery request, got: %v, want: %v", gotErr.GetMessage(), wantNackErr) + } +} + +// Tests the scenario where the xDS client is no longer interested in a +// resource. The following sequence of events are tested: +// 1. A resource is requested and a good response is received. The test verifies +// that an ACK is sent for this resource. +// 2. The previously requested resource is no longer requested. The test +// verifies that the connection to the management server is closed. +// 3. The same resource is requested again. The test verifies that a new +// request is sent with an empty version string, which corresponds to the +// first request on a new connection. +func (s) TestADS_ACK_NACK_ResourceIsNotRequestedAnymore(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. Configure the + // request and response handlers to push on channels that are inspected by + // the test goroutine to verify ACK version and nonce. + streamRequestCh := testutils.NewChannelWithSize(1) + streamResponseCh := testutils.NewChannelWithSize(1) + streamCloseCh := testutils.NewChannelWithSize(1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + streamRequestCh.SendContext(ctx, req) + return nil + }, + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + streamResponseCh.SendContext(ctx, resp) + }, + OnStreamClosed: func(int64, *v3corepb.Node) { + streamCloseCh.SendContext(ctx, struct{}{}) + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + listenerResource := e2e.DefaultClientListener(listenerName, routeConfigName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client pointing to the above server. + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + r, err := streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the initial discovery request") + } + gotReq := r.(*v3discoverypb.DiscoveryRequest) + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "user-agent", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + r, err = streamResponseCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for the discovery response from client") + } + gotResp := r.(*v3discoverypb.DiscoveryResponse) + + // Verify that the ACK contains the appropriate version and nonce. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for ACK") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + wantACKReq := proto.Clone(wantReq).(*v3discoverypb.DiscoveryRequest) + wantACKReq.VersionInfo = gotResp.GetVersionInfo() + wantACKReq.ResponseNonce = gotResp.GetNonce() + if diff := cmp.Diff(gotReq, wantACKReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + wantUpdate := listenerUpdateErrTuple{ + update: listenerUpdate{RouteConfigName: routeConfigName}, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Cancel the watch on the listener resource. This should result in the + // existing connection to be management server getting closed. + ldsCancel() + if _, err := streamCloseCh.Receive(ctx); err != nil { + t.Fatalf("Timeout when expecting existing connection to be closed: %v", err) + } + + // There is a race between two events when the last watch on an xdsChannel + // is canceled: + // - an empty discovery request being sent out + // - the ADS stream being closed + // To handle this race, we drain the request channel here so that if an + // empty discovery request was received, it is pulled out of the request + // channel and thereby guaranteeing a clean slate for the next watch + // registered below. + streamRequestCh.Drain() + + // Register a watch for the same listener resource. + lw = newListenerWatcher() + ldsCancel = client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that the discovery request is identical to the first one sent out + // to the management server. + r, err = streamRequestCh.Receive(ctx) + if err != nil { + t.Fatal("Timeout when waiting for discovery request") + } + gotReq = r.(*v3discoverypb.DiscoveryRequest) + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/clients/xdsclient/test/ads_stream_backoff_test.go b/xds/internal/clients/xdsclient/test/ads_stream_backoff_test.go new file mode 100644 index 000000000..de2a3515b --- /dev/null +++ b/xds/internal/clients/xdsclient/test/ads_stream_backoff_test.go @@ -0,0 +1,444 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "errors" + "fmt" + "net" + "testing" + "time" + + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/xds/internal/clients/grpctransport" + "google.golang.org/grpc/xds/internal/clients/internal/testutils" + "google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e" + "google.golang.org/grpc/xds/internal/clients/xdsclient" + xdsclientinternal "google.golang.org/grpc/xds/internal/clients/xdsclient/internal" + "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + "google.golang.org/protobuf/testing/protocmp" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + "github.com/google/uuid" +) + +func overrideStreamBackOff(t *testing.T, streamBackOff func(int) time.Duration) { + originalStreamBackoff := xdsclientinternal.StreamBackoff + xdsclientinternal.StreamBackoff = streamBackOff + t.Cleanup(func() { xdsclientinternal.StreamBackoff = originalStreamBackoff }) +} + +// Creates an xDS client with the given management server address, nodeID and backoff function. +func createXDSClientWithBackoff(t *testing.T, mgmtServerAddress string, nodeID string, streamBackoff func(int) time.Duration) *xdsclient.XDSClient { + t.Helper() + overrideStreamBackOff(t, streamBackoff) + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + return createXDSClient(t, mgmtServerAddress, nodeID, grpctransport.NewBuilder(configs)) +} + +// Tests the case where the management server returns an error in the ADS +// streaming RPC. Verifies that the ADS stream is restarted after a backoff +// period, and that the previously requested resources are re-requested on the +// new stream. +func (s) TestADS_BackoffAfterStreamFailure(t *testing.T) { + // Channels used for verifying different events in the test. + streamCloseCh := make(chan struct{}, 1) // ADS stream is closed. + ldsResourcesCh := make(chan []string, 1) // Listener resource names in the discovery request. + backoffCh := make(chan struct{}, 1) // Backoff after stream failure. + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server that returns RPC errors. + streamErr := errors.New("ADS stream error") + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + // Push the requested resource names on to a channel. + if req.GetTypeUrl() == version.V3ListenerURL { + t.Logf("Received LDS request for resources: %v", req.GetResourceNames()) + select { + case ldsResourcesCh <- req.GetResourceNames(): + case <-ctx.Done(): + } + } + // Return an error everytime a request is sent on the stream. This + // should cause the transport to backoff before attempting to + // recreate the stream. + return streamErr + }, + // Push on a channel whenever the stream is closed. + OnStreamClosed: func(int64, *v3corepb.Node) { + select { + case streamCloseCh <- struct{}{}: + case <-ctx.Done(): + } + }, + }) + + // Override the backoff implementation to push on a channel that is read by + // the test goroutine. + backoffCtx, backoffCancel := context.WithCancel(ctx) + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + case <-backoffCtx.Done(): + } + return 0 + } + defer backoffCancel() + + // Create an xDS client with bootstrap pointing to the above server. + nodeID := uuid.New().String() + client := createXDSClientWithBackoff(t, mgmtServer.Address, nodeID, streamBackoff) + + // Register a watch for a listener resource. + const listenerName = "listener" + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that an ADS stream is created and an LDS request with the above + // resource name is sent. + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Verify that the received stream error is reported to the watcher. + if err := verifyListenerResourceError(ctx, lw.resourceErrCh, streamErr.Error(), nodeID); err != nil { + t.Fatal(err) + } + + // Verify that the stream is closed. + select { + case <-streamCloseCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for stream to be closed after an error") + } + + // Verify that the ADS stream backs off before recreating the stream. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for ADS stream to backoff after stream failure") + } + + // Verify that the same resource name is re-requested on the new stream. + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // To prevent indefinite blocking during xDS client close, which is caused + // by a blocking backoff channel write, cancel the backoff context early + // given that the test is complete. + backoffCancel() + +} + +// Tests the case where a stream breaks because the server goes down. Verifies +// that when the server comes back up, the same resources are re-requested, this +// time with the previously acked version and an empty nonce. +func (s) TestADS_RetriesAfterBrokenStream(t *testing.T) { + // Channels used for verifying different events in the test. + streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. + streamResponseCh := make(chan *v3discoverypb.DiscoveryResponse, 1) // Discovery response is received. + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server listening on a local port. + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + // Push the received request on to a channel for the test goroutine to + // verify that it matches expectations. + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + case <-ctx.Done(): + } + return nil + }, + // Push the response that the management server is about to send on to a + // channel. The test goroutine to uses this to extract the version and + // nonce, expected on subsequent requests. + OnStreamResponse: func(_ context.Context, _ int64, _ *v3discoverypb.DiscoveryRequest, resp *v3discoverypb.DiscoveryResponse) { + select { + case streamResponseCh <- resp: + case <-ctx.Done(): + } + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Override the backoff implementation to always return 0, to reduce test + // run time. Instead control when the backoff returns by blocking on a + // channel, that the test closes. + backoffCh := make(chan struct{}) + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + case <-ctx.Done(): + } + return 0 + } + + // Create an xDS client pointing to the above server. + client := createXDSClientWithBackoff(t, mgmtServer.Address, nodeID, streamBackoff) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that the initial discovery request matches expectation. + var gotReq *v3discoverypb.DiscoveryRequest + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery request on the stream") + } + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "user-agent", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Capture the version and nonce from the response. + var gotResp *v3discoverypb.DiscoveryResponse + select { + case gotResp = <-streamResponseCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery response on the stream") + } + version := gotResp.GetVersionInfo() + nonce := gotResp.GetNonce() + + // Verify that the ACK contains the appropriate version and nonce. + wantReq.VersionInfo = version + wantReq.ResponseNonce = nonce + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for the discovery request ACK on the stream") + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } + + // Verify the update received by the watcher. + wantUpdate := listenerUpdateErrTuple{ + update: listenerUpdate{ + RouteConfigName: routeConfigName}, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantUpdate); err != nil { + t.Fatal(err) + } + + // Bring down the management server to simulate a broken stream. + lis.Stop() + + // Verify that the error callback on the watcher is not invoked. + verifyNoListenerUpdate(ctx, lw.updateCh) + + // Wait for backoff to kick in, and unblock the first backoff attempt. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatal("Timeout waiting for stream backoff") + } + + // Bring up the management server. The test does not have prcecise control + // over when new streams to the management server will start succeeding. The + // ADS stream implementation will backoff as many times as required before + // it can successfully create a new stream. Therefore, we need to receive on + // the backoffCh as many times as required, and unblock the backoff + // implementation. + lis.Restart() + go func() { + for { + select { + case <-backoffCh: + case <-ctx.Done(): + return + } + } + }() + + // Verify that the transport creates a new stream and sends out a new + // request which contains the previously acked version, but an empty nonce. + wantReq.ResponseNonce = "" + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for the discovery request ACK on the stream") + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } +} + +// Tests the case where a resource is requested before the a valid ADS stream +// exists. Verifies that the a discovery request is sent out for the previously +// requested resource once a valid stream is created. +func (s) TestADS_ResourceRequestedBeforeStreamCreation(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Channels used for verifying different events in the test. + streamRequestCh := make(chan *v3discoverypb.DiscoveryRequest, 1) // Discovery request is received. + + // Create an xDS management server listening on a local port. + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + streamErr := errors.New("ADS stream error") + + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + + // Return an error everytime a request is sent on the stream. This + // should cause the transport to backoff before attempting to recreate + // the stream. + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + select { + case streamRequestCh <- req: + default: + } + return streamErr + }, + }) + + // Bring down the management server before creating the transport. This + // allows us to test the case where SendRequest() is called when there is no + // stream to the management server. + lis.Stop() + + // Override the backoff implementation to always return 0, to reduce test + // run time. Instead control when the backoff returns by blocking on a + // channel, that the test closes. + backoffCh := make(chan struct{}, 1) + unblockBackoffCh := make(chan struct{}) + streamBackoff := func(v int) time.Duration { + select { + case backoffCh <- struct{}{}: + default: + } + <-unblockBackoffCh + return 0 + } + + // Create an xDS client with bootstrap pointing to the above server. + nodeID := uuid.New().String() + client := createXDSClientWithBackoff(t, mgmtServer.Address, nodeID, streamBackoff) + + // Register a watch for a listener resource. + const listenerName = "listener" + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // The above watch results in an attempt to create a new stream, which will + // fail, and will result in backoff. Wait for backoff to kick in. + select { + case <-backoffCh: + case <-ctx.Done(): + t.Fatal("Timeout waiting for stream backoff") + } + + // Bring up the connection to the management server, and unblock the backoff + // implementation. + lis.Restart() + close(unblockBackoffCh) + + // Verify that the initial discovery request matches expectation. + var gotReq *v3discoverypb.DiscoveryRequest + select { + case gotReq = <-streamRequestCh: + case <-ctx.Done(): + t.Fatalf("Timeout waiting for discovery request on the stream") + } + wantReq := &v3discoverypb.DiscoveryRequest{ + VersionInfo: "", + Node: &v3corepb.Node{ + Id: nodeID, + UserAgentName: "user-agent", + UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "0.0.0.0"}, + ClientFeatures: []string{"envoy.lb.does_not_support_overprovisioning", "xds.config.resource-in-sotw"}, + }, + ResourceNames: []string{listenerName}, + TypeUrl: "type.googleapis.com/envoy.config.listener.v3.Listener", + ResponseNonce: "", + } + if diff := cmp.Diff(gotReq, wantReq, protocmp.Transform()); diff != "" { + t.Fatalf("Unexpected diff in received discovery request, diff (-got, +want):\n%s", diff) + } +} + +// waitForResourceNames waits for the wantNames to be received on namesCh. +// Returns a non-nil error if the context expires before that. +func waitForResourceNames(ctx context.Context, t *testing.T, namesCh chan []string, wantNames []string) error { + t.Helper() + + var lastRequestedNames []string + for ; ; <-time.After(defaultTestShortTimeout) { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for resources %v to be requested from the management server. Last requested resources: %v", wantNames, lastRequestedNames) + case gotNames := <-namesCh: + if cmp.Equal(gotNames, wantNames, cmpopts.EquateEmpty(), cmpopts.SortSlices(func(s1, s2 string) bool { return s1 < s2 })) { + return nil + } + lastRequestedNames = gotNames + } + } +} diff --git a/xds/internal/clients/xdsclient/test/ads_stream_flow_control_test.go b/xds/internal/clients/xdsclient/test/ads_stream_flow_control_test.go new file mode 100644 index 000000000..606043d9c --- /dev/null +++ b/xds/internal/clients/xdsclient/test/ads_stream_flow_control_test.go @@ -0,0 +1,629 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "errors" + "fmt" + "slices" + "sort" + "testing" + "time" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" + "github.com/google/uuid" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/clients" + "google.golang.org/grpc/xds/internal/clients/xdsclient" + "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" +) + +// blockingListenerWatcher implements xdsresource.ListenerWatcher. It writes to +// a channel when it receives a callback from the watch. It also makes the +// DoneNotifier passed to the callback available to the test, thereby enabling +// the test to block this watcher for as long as required. +type blockingListenerWatcher struct { + doneNotifierCh chan func() // DoneNotifier passed to the callback. + updateCh chan struct{} // Written to when an update is received. + ambientErrCh chan struct{} // Written to when an ambient error is received. + resourceErrCh chan struct{} // Written to when a resource error is received. +} + +func newBLockingListenerWatcher() *blockingListenerWatcher { + return &blockingListenerWatcher{ + doneNotifierCh: make(chan func(), 1), + updateCh: make(chan struct{}, 1), + ambientErrCh: make(chan struct{}, 1), + resourceErrCh: make(chan struct{}, 1), + } +} + +func (lw *blockingListenerWatcher) ResourceChanged(update xdsclient.ResourceData, done func()) { + // Notify receipt of the update. + select { + case lw.updateCh <- struct{}{}: + default: + } + + select { + case lw.doneNotifierCh <- done: + default: + } +} + +func (lw *blockingListenerWatcher) ResourceError(err error, done func()) { + // Notify receipt of an error. + select { + case lw.resourceErrCh <- struct{}{}: + default: + } + + select { + case lw.doneNotifierCh <- done: + default: + } +} + +func (lw *blockingListenerWatcher) AmbientError(err error, done func()) { + // Notify receipt of an error. + select { + case lw.ambientErrCh <- struct{}{}: + default: + } + + select { + case lw.doneNotifierCh <- done: + default: + } +} + +type transportBuilder struct { + adsStreamCh chan *stream +} + +func (b *transportBuilder) Build(si clients.ServerIdentifier) (clients.Transport, error) { + cc, err := grpc.NewClient(si.ServerURI, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithDefaultCallOptions(grpc.ForceCodec(&byteCodec{}))) + if err != nil { + return nil, err + } + + return &transport{cc: cc, adsStreamCh: b.adsStreamCh}, nil +} + +type transport struct { + cc *grpc.ClientConn + adsStreamCh chan *stream +} + +func (t *transport) NewStream(ctx context.Context, method string) (clients.Stream, error) { + s, err := t.cc.NewStream(ctx, &grpc.StreamDesc{ClientStreams: true, ServerStreams: true}, method) + if err != nil { + return nil, err + } + + stream := &stream{ + stream: s, + recvCh: make(chan struct{}, 1), + doneCh: make(chan struct{}), + } + t.adsStreamCh <- stream + + return stream, nil +} + +func (t *transport) Close() { + t.cc.Close() +} + +type stream struct { + stream grpc.ClientStream + + recvCh chan struct{} + doneCh <-chan struct{} +} + +func (s *stream) Send(msg []byte) error { + return s.stream.SendMsg(msg) +} + +func (s *stream) Recv() ([]byte, error) { + select { + case s.recvCh <- struct{}{}: + case <-s.doneCh: + return nil, errors.New("Recv() called after the test has finished") + } + + var typedRes []byte + if err := s.stream.RecvMsg(&typedRes); err != nil { + return nil, err + } + return typedRes, nil +} + +type byteCodec struct{} + +func (c *byteCodec) Marshal(v any) ([]byte, error) { + if b, ok := v.([]byte); ok { + return b, nil + } + return nil, fmt.Errorf("transport: message is %T, but must be a []byte", v) +} + +func (c *byteCodec) Unmarshal(data []byte, v any) error { + if b, ok := v.(*[]byte); ok { + *b = data + return nil + } + return fmt.Errorf("transport: target is %T, but must be *[]byte", v) +} + +func (c *byteCodec) Name() string { + return "transport.byteCodec" +} + +// Tests ADS stream level flow control with a single resource. The test does the +// following: +// - Starts a management server and configures a listener resource on it. +// - Creates an xDS client to the above management server, starts a couple of +// listener watchers for the above resource, and verifies that the update +// reaches these watchers. +// - These watchers don't invoke the onDone callback until explicitly +// triggered by the test. This allows the test to verify that the next +// Recv() call on the ADS stream does not happen until both watchers have +// completely processed the update, i.e invoke the onDone callback. +// - Resource is updated on the management server, and the test verifies that +// the update reaches the watchers. +func (s) TestADSFlowControl_ResourceUpdates_SingleResource(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + nodeID := uuid.New().String() + + // Create an xDS client pointing to the above server with a test transport + // that allow monitoring the underlying stream through adsStreamCh. + adsStreamCh := make(chan *stream, 1) + client := createXDSClient(t, mgmtServer.Address, nodeID, &transportBuilder{adsStreamCh: adsStreamCh}) + + // Configure two watchers for the same listener resource. + const listenerResourceName = "test-listener-resource" + const routeConfigurationName = "test-route-configuration-resource" + watcher1 := newBLockingListenerWatcher() + cancel1 := client.WatchResource(xdsresource.V3ListenerURL, listenerResourceName, watcher1) + defer cancel1() + watcher2 := newBLockingListenerWatcher() + cancel2 := client.WatchResource(xdsresource.V3ListenerURL, listenerResourceName, watcher2) + defer cancel2() + + // Wait for the ADS stream to be created. + var adsStream *stream + select { + case adsStream = <-adsStreamCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be created") + } + + // Configure the listener resource on the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Ensure that there is a read on the stream. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be read from") + } + + // Wait for the update to reach the watchers. + select { + case <-watcher1.updateCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for update to reach watcher 1") + } + select { + case <-watcher2.updateCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for update to reach watcher 2") + } + + // Update the listener resource on the management server to point to a new + // route configuration resource. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, "new-route")}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Unblock one watcher. + onDone := <-watcher1.doneNotifierCh + onDone() + + // Wait for a short duration and ensure that there is no read on the stream. + select { + case <-adsStream.recvCh: + t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update") + case <-time.After(defaultTestShortTimeout): + } + + // Unblock the second watcher. + onDone = <-watcher2.doneNotifierCh + onDone() + + // Ensure that there is a read on the stream, now that the previous update + // has been consumed by all watchers. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream after all watchers have processed the previous update") + } + + // Wait for the new update to reach the watchers. + select { + case <-watcher1.updateCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for update to reach watcher 1") + } + select { + case <-watcher2.updateCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for update to reach watcher 2") + } + + // At this point, the xDS client is shut down (and the associated transport + // is closed) without the watchers invoking their respective onDone + // callbacks. This verifies that the closing a transport that has pending + // watchers does not block. +} + +// Tests ADS stream level flow control with a multiple resources. The test does +// the following: +// - Starts a management server and configures two listener resources on it. +// - Creates an xDS client to the above management server, starts a couple of +// listener watchers for the two resources, and verifies that the update +// reaches these watchers. +// - These watchers don't invoke the onDone callback until explicitly +// triggered by the test. This allows the test to verify that the next +// Recv() call on the ADS stream does not happen until both watchers have +// completely processed the update, i.e invoke the onDone callback. +func (s) TestADSFlowControl_ResourceUpdates_MultipleResources(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server. + const listenerResourceName1 = "test-listener-resource-1" + const listenerResourceName2 = "test-listener-resource-2" + wantResourceNames := []string{listenerResourceName1, listenerResourceName2} + requestCh := make(chan struct{}, 1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + OnStreamRequest: func(id int64, req *v3discoverypb.DiscoveryRequest) error { + if req.GetTypeUrl() != version.V3ListenerURL { + return nil + } + gotResourceNames := req.GetResourceNames() + sort.Slice(gotResourceNames, func(i, j int) bool { return req.ResourceNames[i] < req.ResourceNames[j] }) + if slices.Equal(gotResourceNames, wantResourceNames) { + // The two resource names will be part of the initial request + // and also the ACK. Hence, we need to make this write + // non-blocking. + select { + case requestCh <- struct{}{}: + default: + } + } + return nil + }, + }) + + nodeID := uuid.New().String() + + // Create an xDS client pointing to the above server with a test transport + // that allow monitoring the underlying stream through adsStreamCh. + adsStreamCh := make(chan *stream, 1) + client := createXDSClient(t, mgmtServer.Address, nodeID, &transportBuilder{adsStreamCh: adsStreamCh}) + + // Configure two watchers for two different listener resources. + const routeConfigurationName1 = "test-route-configuration-resource-1" + watcher1 := newBLockingListenerWatcher() + cancel1 := client.WatchResource(xdsresource.V3ListenerURL, listenerResourceName1, watcher1) + defer cancel1() + const routeConfigurationName2 = "test-route-configuration-resource-2" + watcher2 := newBLockingListenerWatcher() + cancel2 := client.WatchResource(xdsresource.V3ListenerURL, listenerResourceName2, watcher2) + defer cancel2() + + // Wait for the wrapped ADS stream to be created. + var adsStream *stream + select { + case adsStream = <-adsStreamCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be created") + } + + // Ensure that there is a read on the stream. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be read from") + } + + // Wait for both resource names to be requested. + select { + case <-requestCh: + case <-ctx.Done(): + t.Fatal("Timed out waiting for both resource names to be requested") + } + + // Configure the listener resources on the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{ + e2e.DefaultClientListener(listenerResourceName1, routeConfigurationName1), + e2e.DefaultClientListener(listenerResourceName2, routeConfigurationName2), + }, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // At this point, we expect the management server to send both resources in + // the same response. So, both watchers would be notified at the same time, + // and no more Recv() calls should happen until both of them have invoked + // their respective onDone() callbacks. + + // The order of callback invocations among the two watchers is not + // guaranteed. So, we select on both of them and unblock the first watcher + // whose callback is invoked. + var otherWatcherUpdateCh chan struct{} + var otherWatcherDoneCh chan func() + select { + case <-watcher1.updateCh: + onDone := <-watcher1.doneNotifierCh + onDone() + otherWatcherUpdateCh = watcher2.updateCh + otherWatcherDoneCh = watcher2.doneNotifierCh + case <-watcher2.updateCh: + onDone := <-watcher2.doneNotifierCh + onDone() + otherWatcherUpdateCh = watcher1.updateCh + otherWatcherDoneCh = watcher1.doneNotifierCh + case <-ctx.Done(): + t.Fatal("Timed out waiting for update to reach first watchers") + } + + // Wait for a short duration and ensure that there is no read on the stream. + select { + case <-adsStream.recvCh: + t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update") + case <-time.After(defaultTestShortTimeout): + } + + // Wait for the update on the second watcher and unblock it. + select { + case <-otherWatcherUpdateCh: + onDone := <-otherWatcherDoneCh + onDone() + case <-ctx.Done(): + t.Fatal("Timed out waiting for update to reach second watcher") + } + + // Ensure that there is a read on the stream, now that the previous update + // has been consumed by all watchers. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream after all watchers have processed the previous update") + } +} + +// Test ADS stream flow control with a single resource that is expected to be +// NACKed by the xDS client and the watcher's ResourceError() callback is +// expected to be invoked because resource is not cached. Verifies that no +// further reads are attempted until the error is completely processed by the +// watcher. +func (s) TestADSFlowControl_ResourceErrors(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + nodeID := uuid.New().String() + + // Create an xDS client pointing to the above server with a test transport + // that allow monitoring the underlying stream through adsStreamCh. + adsStreamCh := make(chan *stream, 1) + client := createXDSClient(t, mgmtServer.Address, nodeID, &transportBuilder{adsStreamCh: adsStreamCh}) + + // Configure a watcher for a listener resource. + const listenerResourceName = "test-listener-resource" + watcher := newBLockingListenerWatcher() + cancel = client.WatchResource(xdsresource.V3ListenerURL, listenerResourceName, watcher) + defer cancel() + + // Wait for the stream to be created. + var adsStream *stream + select { + case adsStream = <-adsStreamCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be created") + } + + // Configure the management server to return a single listener resource + // which is expected to be NACKed by the client. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{badListenerResource(t, listenerResourceName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Ensure that there is a read on the stream. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be read from") + } + + // Wait for the resource error to reach the watcher. + select { + case <-watcher.resourceErrCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for error to reach watcher") + } + + // Wait for a short duration and ensure that there is no read on the stream. + select { + case <-adsStream.recvCh: + t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update") + case <-time.After(defaultTestShortTimeout): + } + + // Unblock one watcher. + onDone := <-watcher.doneNotifierCh + onDone() + + // Ensure that there is a read on the stream, now that the previous error + // has been consumed by the watcher. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream after all watchers have processed the previous update") + } +} + +// Test ADS stream flow control with a single resource that is deleted from the +// management server and therefore the watcher's ResourceError() +// callback is expected to be invoked. Verifies that no further reads are +// attempted until the callback is completely handled by the watcher. +func (s) TestADSFlowControl_ResourceDoesNotExist(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + nodeID := uuid.New().String() + + // Create an xDS client pointing to the above server with a test transport + // that allow monitoring the underlying stream through adsStreamCh. + adsStreamCh := make(chan *stream, 1) + client := createXDSClient(t, mgmtServer.Address, nodeID, &transportBuilder{adsStreamCh: adsStreamCh}) + + // Configure a watcher for a listener resource. + const listenerResourceName = "test-listener-resource" + const routeConfigurationName = "test-route-configuration-resource" + watcher := newBLockingListenerWatcher() + cancel = client.WatchResource(xdsresource.V3ListenerURL, listenerResourceName, watcher) + defer cancel() + + // Wait for the ADS stream to be created. + var adsStream *stream + select { + case adsStream = <-adsStreamCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for ADS stream to be created") + } + + // Configure the listener resource on the management server. + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerResourceName, routeConfigurationName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Ensure that there is a read on the stream. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream") + } + + // Wait for the update to reach the watcher and unblock it. + select { + case <-watcher.updateCh: + onDone := <-watcher.doneNotifierCh + onDone() + case <-ctx.Done(): + t.Fatalf("Timed out waiting for update to reach watcher 1") + } + + // Ensure that there is a read on the stream. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream") + } + + // Remove the listener resource on the management server. + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatalf("Failed to update management server with resources: %v, err: %v", resources, err) + } + + // Wait for the resource not found callback to be invoked. + select { + case <-watcher.resourceErrCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for resource not found callback to be invoked on the watcher") + } + + // Wait for a short duration and ensure that there is no read on the stream. + select { + case <-adsStream.recvCh: + t.Fatal("Recv() called on the ADS stream before all watchers have processed the previous update") + case <-time.After(defaultTestShortTimeout): + } + + // Unblock the watcher. + onDone := <-watcher.doneNotifierCh + onDone() + + // Ensure that there is a read on the stream. + select { + case <-adsStream.recvCh: + case <-ctx.Done(): + t.Fatalf("Timed out waiting for Recv() to be called on the ADS stream") + } +} diff --git a/xds/internal/clients/xdsclient/test/ads_stream_restart_test.go b/xds/internal/clients/xdsclient/test/ads_stream_restart_test.go new file mode 100644 index 000000000..9d5755784 --- /dev/null +++ b/xds/internal/clients/xdsclient/test/ads_stream_restart_test.go @@ -0,0 +1,203 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "net" + "testing" + + "github.com/google/uuid" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/clients/grpctransport" + "google.golang.org/grpc/xds/internal/clients/internal/testutils" + "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + "google.golang.org/grpc/xds/internal/xdsclient/xdsresource/version" + + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" + v3discoverypb "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3" +) + +// Tests that an ADS stream is restarted after a connection failure. Also +// verifies that if there were any watches registered before the connection +// failed, those resources are re-requested after the stream is restarted. +func (s) TestADS_ResourcesAreRequestedAfterStreamRestart(t *testing.T) { + // Create a restartable listener that can simulate a broken ADS stream. + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server that uses a couple of channels to inform + // the test about the specific LDS and CDS resource names being requested. + ldsResourcesCh := make(chan []string, 2) + streamOpened := make(chan struct{}, 1) + streamClosed := make(chan struct{}, 1) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{ + Listener: lis, + OnStreamRequest: func(_ int64, req *v3discoverypb.DiscoveryRequest) error { + t.Logf("Received request for resources: %v of type %s", req.GetResourceNames(), req.GetTypeUrl()) + + // Drain the resource name channels before writing to them to ensure + // that the most recently requested names are made available to the + // test. + switch req.GetTypeUrl() { + case version.V3ListenerURL: + select { + case <-ldsResourcesCh: + default: + } + ldsResourcesCh <- req.GetResourceNames() + } + return nil + }, + OnStreamClosed: func(int64, *v3corepb.Node) { + select { + case streamClosed <- struct{}{}: + default: + } + + }, + OnStreamOpen: func(context.Context, int64, string) error { + select { + case streamOpened <- struct{}{}: + default: + } + return nil + }, + }) + + // Create a listener resource on the management server. + const listenerName = "listener" + const routeConfigName = "route-config" + nodeID := uuid.New().String() + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create an xDS client pointing to the above server. + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) + + // Register a watch for a listener resource. + lw := newListenerWatcher() + ldsCancel := client.WatchResource(xdsresource.V3ListenerURL, listenerName, lw) + defer ldsCancel() + + // Verify that an ADS stream is opened and an LDS request with the above + // resource name is sent. + select { + case <-streamOpened: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to open") + } + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Verify the update received by the watcher. + wantListenerUpdate := listenerUpdateErrTuple{ + update: listenerUpdate{ + RouteConfigName: routeConfigName, + }, + } + if err := verifyListenerUpdate(ctx, lw.updateCh, wantListenerUpdate); err != nil { + t.Fatal(err) + } + + // Create another listener resource on the management server, in addition + // to the existing listener resource. + const listenerName2 = "listener2" + const routeConfigName2 = "route-config2" + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{e2e.DefaultClientListener(listenerName, routeConfigName), e2e.DefaultClientListener(listenerName2, routeConfigName2)}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Register a watch for another listener resource, and verify that a LDS request + // with the both listener resource names are sent. + lw2 := newListenerWatcher() + ldsCancel2 := client.WatchResource(xdsresource.V3ListenerURL, listenerName2, lw2) + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName, listenerName2}); err != nil { + t.Fatal(err) + } + + // Verify the update received by the watcher. + wantListenerUpdate = listenerUpdateErrTuple{ + update: listenerUpdate{ + RouteConfigName: routeConfigName2, + }, + } + if err := verifyListenerUpdate(ctx, lw2.updateCh, wantListenerUpdate); err != nil { + t.Fatal(err) + } + + // Cancel the watch for the second listener resource, and verify that an LDS + // request with only first listener resource names is sent. + ldsCancel2() + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Stop the restartable listener and wait for the stream to close. + lis.Stop() + select { + case <-streamClosed: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to close") + } + + // Restart the restartable listener and wait for the stream to open. + lis.Restart() + select { + case <-streamOpened: + case <-ctx.Done(): + t.Fatal("Timeout when waiting for ADS stream to open") + } + + // Verify that the first listener resource is requested again. + if err := waitForResourceNames(ctx, t, ldsResourcesCh, []string{listenerName}); err != nil { + t.Fatal(err) + } + + // Wait for a short duration and verify that no LDS request is sent, since + // there are no resources being watched. + sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout) + defer sCancel() + select { + case <-sCtx.Done(): + case names := <-ldsResourcesCh: + t.Fatalf("LDS request sent for resource names %v, when expecting no request", names) + } +} diff --git a/xds/internal/clients/xdsclient/test/ads_stream_watch_test.go b/xds/internal/clients/xdsclient/test/ads_stream_watch_test.go new file mode 100644 index 000000000..178da6c59 --- /dev/null +++ b/xds/internal/clients/xdsclient/test/ads_stream_watch_test.go @@ -0,0 +1,196 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xdsclient_test + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/google/uuid" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/testutils/xds/e2e" + "google.golang.org/grpc/xds/internal/clients/grpctransport" + "google.golang.org/grpc/xds/internal/clients/internal/testutils" + "google.golang.org/grpc/xds/internal/clients/xdsclient" + xdsclientinternal "google.golang.org/grpc/xds/internal/clients/xdsclient/internal" + "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" + + v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" +) + +func waitForResourceWatchState(ctx context.Context, client *xdsclient.XDSClient, resourceName string, wantState xdsresource.WatchState, wantTimer bool) error { + var lastErr error + for ; ctx.Err() == nil; <-time.After(defaultTestShortTimeout) { + err := verifyResourceWatchState(client, resourceName, wantState, wantTimer) + if err == nil { + break + } + lastErr = err + } + if ctx.Err() != nil { + return fmt.Errorf("timeout when waiting for expected watch state for resource %q: %v", resourceName, lastErr) + } + return nil +} + +func verifyResourceWatchState(client *xdsclient.XDSClient, resourceName string, wantState xdsresource.WatchState, wantTimer bool) error { + resourceWatchStateForTesting := xdsclientinternal.ResourceWatchStateForTesting.(func(*xdsclient.XDSClient, xdsclient.ResourceType, string) (xdsresource.ResourceWatchState, error)) + gotState, err := resourceWatchStateForTesting(client, listenerType, resourceName) + if err != nil { + return fmt.Errorf("failed to get watch state for resource %q: %v", resourceName, err) + } + if gotState.State != wantState { + return fmt.Errorf("watch state for resource %q is %v, want %v", resourceName, gotState.State, wantState) + } + if (gotState.ExpiryTimer != nil) != wantTimer { + return fmt.Errorf("expiry timer for resource %q is %t, want %t", resourceName, gotState.ExpiryTimer != nil, wantTimer) + } + return nil +} + +// Tests the state transitions of the resource specific watch state within the +// ADS stream, specifically when the stream breaks (for both resources that have +// been previously received and for resources that are yet to be received). +func (s) TestADS_WatchState_StreamBreaks(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Create an xDS management server with a restartable listener. + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("net.Listen() failed: %v", err) + } + lis := testutils.NewRestartableListener(l) + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{Listener: lis}) + + // Create an xDS client pointing to the above server. + nodeID := uuid.New().String() + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) + + // Create a watch for the first listener resource and verify that the timer + // is running and the watch state is `requested`. + const listenerName1 = "listener1" + ldsCancel1 := client.WatchResource(xdsresource.V3ListenerURL, listenerName1, noopListenerWatcher{}) + defer ldsCancel1() + if err := waitForResourceWatchState(ctx, client, listenerName1, xdsresource.ResourceWatchStateRequested, true); err != nil { + t.Fatal(err) + } + + // Configure the first resource on the management server. This should result + // in the resource being pushed to the xDS client and should result in the + // timer getting stopped and the watch state moving to `received`. + const routeConfigName = "route-config" + listenerResource1 := e2e.DefaultClientListener(listenerName1, routeConfigName) + resources := e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource1}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := waitForResourceWatchState(ctx, client, listenerName1, xdsresource.ResourceWatchStateReceived, false); err != nil { + t.Fatal(err) + } + + // Create a watch for the second listener resource and verify that the timer + // is running and the watch state is `requested`. + const listenerName2 = "listener2" + ldsCancel2 := client.WatchResource(xdsresource.V3ListenerURL, listenerName2, noopListenerWatcher{}) + defer ldsCancel2() + if err := waitForResourceWatchState(ctx, client, listenerName2, xdsresource.ResourceWatchStateRequested, true); err != nil { + t.Fatal(err) + } + + // Stop the server to break the ADS stream. Since the first resource was + // already received, this should not change anything for it. But for the + // second resource, it should result in the timer getting stopped and the + // watch state moving to `started`. + lis.Stop() + if err := waitForResourceWatchState(ctx, client, listenerName2, xdsresource.ResourceWatchStateStarted, false); err != nil { + t.Fatal(err) + } + if err := verifyResourceWatchState(client, listenerName1, xdsresource.ResourceWatchStateReceived, false); err != nil { + t.Fatal(err) + } + + // Restart the server and verify that the timer is running and the watch + // state is `requested`, for the second resource. For the first resource, + // nothing should change. + lis.Restart() + if err := waitForResourceWatchState(ctx, client, listenerName2, xdsresource.ResourceWatchStateRequested, true); err != nil { + t.Fatal(err) + } + if err := verifyResourceWatchState(client, listenerName1, xdsresource.ResourceWatchStateReceived, false); err != nil { + t.Fatal(err) + } + + // Configure the second resource on the management server. This should result + // in the resource being pushed to the xDS client and should result in the + // timer getting stopped and the watch state moving to `received`. + listenerResource2 := e2e.DefaultClientListener(listenerName2, routeConfigName) + resources = e2e.UpdateOptions{ + NodeID: nodeID, + Listeners: []*v3listenerpb.Listener{listenerResource1, listenerResource2}, + SkipValidation: true, + } + if err := mgmtServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + if err := waitForResourceWatchState(ctx, client, listenerName2, xdsresource.ResourceWatchStateReceived, false); err != nil { + t.Fatal(err) + } +} + +// Tests the behavior of the xDS client when a resource watch timer expires and +// verifies the resource watch state transitions as expected. +func (s) TestADS_WatchState_TimerFires(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + // Start an xDS management server. + mgmtServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{}) + + // Create an xDS client with bootstrap pointing to the above server, and a + // short resource expiry timeout. + nodeID := uuid.New().String() + configs := map[string]grpctransport.Config{"insecure": {Credentials: insecure.NewBundle()}} + overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) + client := createXDSClient(t, mgmtServer.Address, nodeID, grpctransport.NewBuilder(configs)) + + // Create a watch for the first listener resource and verify that the timer + // is running and the watch state is `requested`. + const listenerName = "listener" + ldsCancel1 := client.WatchResource(xdsresource.V3ListenerURL, listenerName, noopListenerWatcher{}) + defer ldsCancel1() + if err := waitForResourceWatchState(ctx, client, listenerName, xdsresource.ResourceWatchStateRequested, true); err != nil { + t.Fatal(err) + } + + // Since the resource is not configured on the management server, the watch + // expiry timer is expected to fire, and the watch state should move to + // `timeout`. + if err := waitForResourceWatchState(ctx, client, listenerName, xdsresource.ResourceWatchStateTimeout, false); err != nil { + t.Fatal(err) + } +} diff --git a/xds/internal/clients/xdsclient/test/authority_test.go b/xds/internal/clients/xdsclient/test/authority_test.go index 35d86921e..0256c5bb0 100644 --- a/xds/internal/clients/xdsclient/test/authority_test.go +++ b/xds/internal/clients/xdsclient/test/authority_test.go @@ -108,11 +108,11 @@ func setupForAuthorityTests(ctx context.Context, t *testing.T) (*testutils.Liste } // Create an xDS client with the above config. + overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) client, err := xdsclient.New(xdsClientConfig) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } - client.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout) resources := e2e.UpdateOptions{ NodeID: nodeID, diff --git a/xds/internal/clients/xdsclient/test/lds_watchers_test.go b/xds/internal/clients/xdsclient/test/lds_watchers_test.go index 5fe604241..3d16d3f6f 100644 --- a/xds/internal/clients/xdsclient/test/lds_watchers_test.go +++ b/xds/internal/clients/xdsclient/test/lds_watchers_test.go @@ -35,6 +35,7 @@ import ( "google.golang.org/grpc/xds/internal/clients/internal/testutils" "google.golang.org/grpc/xds/internal/clients/internal/testutils/e2e" "google.golang.org/grpc/xds/internal/clients/xdsclient" + xdsclientinternal "google.golang.org/grpc/xds/internal/clients/xdsclient/internal" "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" v3listenerpb "github.com/envoyproxy/go-control-plane/envoy/config/listener/v3" @@ -114,6 +115,12 @@ func badListenerResource(t *testing.T, name string) *v3listenerpb.Listener { } } +func overrideWatchExpiryTimeout(t *testing.T, watchExpiryTimeout time.Duration) { + originalWatchExpiryTimeout := xdsclientinternal.WatchExpiryTimeout + xdsclientinternal.WatchExpiryTimeout = watchExpiryTimeout + t.Cleanup(func() { xdsclientinternal.WatchExpiryTimeout = originalWatchExpiryTimeout }) +} + // verifyNoListenerUpdate verifies that no listener update is received on the // provided update channel, and returns an error if an update is received. // @@ -168,6 +175,25 @@ func verifyListenerUpdate(ctx context.Context, updateCh *testutils.Channel, want return nil } +func verifyListenerResourceError(ctx context.Context, updateCh *testutils.Channel, wantErr, wantNodeID string) error { + u, err := updateCh.Receive(ctx) + if err != nil { + return fmt.Errorf("timeout when waiting for a listener error from the management server: %v", err) + } + gotErr := u.(listenerUpdateErrTuple).resourceErr + return verifyListenerError(ctx, gotErr, wantErr, wantNodeID) +} + +func verifyListenerError(ctx context.Context, gotErr error, wantErr, wantNodeID string) error { + if gotErr == nil || !strings.Contains(gotErr.Error(), wantErr) { + return fmt.Errorf("update received with error: %v, want %q", gotErr, wantErr) + } + if !strings.Contains(gotErr.Error(), wantNodeID) { + return fmt.Errorf("update received with error: %v, want error with node ID: %q", gotErr, wantNodeID) + } + return nil +} + func verifyAmbientErrorType(ctx context.Context, updateCh *testutils.Channel, wantErrType xdsresource.ErrorType, wantNodeID string) error { u, err := updateCh.Receive(ctx) if err != nil { @@ -704,12 +730,12 @@ func TestLDSWatch_ExpiryTimerFiresBeforeResponse(t *testing.T) { // Create an xDS client with the above config and override the default // watch expiry timeout. + overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) client, err := xdsclient.New(xdsClientConfig) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } defer client.Close() - client.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout) // Register a watch for a resource which is expected to fail with an error // after the watch expiry timer fires. @@ -755,12 +781,12 @@ func (s) TestLDSWatch_ValidResponseCancelsExpiryTimerBehavior(t *testing.T) { // Create an xDS client with the above config and override the default // watch expiry timeout. + overrideWatchExpiryTimeout(t, defaultTestWatchExpiryTimeout) client, err := xdsclient.New(xdsClientConfig) if err != nil { t.Fatalf("Failed to create xDS client: %v", err) } defer client.Close() - client.SetWatchExpiryTimeoutForTesting(defaultTestWatchExpiryTimeout) // Register a watch for a listener resource and have the watch // callback push the received update on to a channel. diff --git a/xds/internal/clients/xdsclient/xdsclient.go b/xds/internal/clients/xdsclient/xdsclient.go index d6fe4c41d..c9cd52a1e 100644 --- a/xds/internal/clients/xdsclient/xdsclient.go +++ b/xds/internal/clients/xdsclient/xdsclient.go @@ -43,6 +43,7 @@ import ( clientsinternal "google.golang.org/grpc/xds/internal/clients/internal" "google.golang.org/grpc/xds/internal/clients/internal/backoff" "google.golang.org/grpc/xds/internal/clients/internal/syncutil" + xdsclientinternal "google.golang.org/grpc/xds/internal/clients/xdsclient/internal" "google.golang.org/grpc/xds/internal/clients/xdsclient/internal/xdsresource" "google.golang.org/grpc/xds/internal/clients/xdsclient/metrics" "google.golang.org/protobuf/proto" @@ -59,6 +60,12 @@ var ( defaultExponentialBackoff = backoff.DefaultExponential.Backoff ) +func init() { + xdsclientinternal.WatchExpiryTimeout = defaultWatchExpiryTimeout + xdsclientinternal.StreamBackoff = defaultExponentialBackoff + xdsclientinternal.ResourceWatchStateForTesting = resourceWatchStateForTesting +} + // XDSClient is a client which queries a set of discovery APIs (collectively // termed as xDS) on a remote management server, to discover // various dynamic resources. @@ -104,7 +111,7 @@ func New(config Config) (*XDSClient, error) { return nil, errors.New("xdsclient: no servers or authorities specified") } - client, err := newClient(&config, defaultWatchExpiryTimeout, defaultExponentialBackoff, name) + client, err := newClient(&config, name) if err != nil { return nil, err } @@ -118,15 +125,15 @@ func (c *XDSClient) SetWatchExpiryTimeoutForTesting(watchExpiryTimeout time.Dura } // newClient returns a new XDSClient with the given config. -func newClient(config *Config, watchExpiryTimeout time.Duration, streamBackoff func(int) time.Duration, target string) (*XDSClient, error) { +func newClient(config *Config, target string) (*XDSClient, error) { ctx, cancel := context.WithCancel(context.Background()) c := &XDSClient{ target: target, done: syncutil.NewEvent(), authorities: make(map[string]*authority), config: config, - watchExpiryTimeout: watchExpiryTimeout, - backoff: streamBackoff, + watchExpiryTimeout: xdsclientinternal.WatchExpiryTimeout, + backoff: xdsclientinternal.StreamBackoff, serializer: syncutil.NewCallbackSerializer(ctx), serializerClose: cancel, transportBuilder: config.TransportBuilder, @@ -431,3 +438,15 @@ func (cs *channelState) adsResourceDoesNotExist(typ ResourceType, resourceName s authority.adsResourceDoesNotExist(typ, resourceName) } } + +func resourceWatchStateForTesting(c *XDSClient, rType ResourceType, resourceName string) (xdsresource.ResourceWatchState, error) { + c.channelsMu.Lock() + defer c.channelsMu.Unlock() + + for _, state := range c.xdsActiveChannels { + if st, err := state.channel.ads.adsResourceWatchStateForTesting(rType, resourceName); err == nil { + return st, nil + } + } + return xdsresource.ResourceWatchState{}, fmt.Errorf("unable to find watch state for resource type %q and name %q", rType.TypeName, resourceName) +}