diff --git a/internal/testutils/blocking_context_dialer.go b/internal/testutils/blocking_context_dialer.go index ea7a85193..32dfabf36 100644 --- a/internal/testutils/blocking_context_dialer.go +++ b/internal/testutils/blocking_context_dialer.go @@ -21,36 +21,109 @@ package testutils import ( "context" "net" + "sync" + + "google.golang.org/grpc/grpclog" ) +var logger = grpclog.Component("testutils") + // BlockingDialer is a dialer that waits for Resume() to be called before // dialing. type BlockingDialer struct { - dialer *net.Dialer - blockCh chan struct{} + // mu protects holds. + mu sync.Mutex + // holds maps network addresses to a list of holds for that address. + holds map[string][]*Hold } // NewBlockingDialer returns a dialer that waits for Resume() to be called // before dialing. func NewBlockingDialer() *BlockingDialer { return &BlockingDialer{ - dialer: &net.Dialer{}, - blockCh: make(chan struct{}), + holds: make(map[string][]*Hold), } } // DialContext implements a context dialer for use with grpc.WithContextDialer // dial option for a BlockingDialer. func (d *BlockingDialer) DialContext(ctx context.Context, addr string) (net.Conn, error) { + d.mu.Lock() + holds := d.holds[addr] + if len(holds) == 0 { + // No hold for this addr. + d.mu.Unlock() + return (&net.Dialer{}).DialContext(ctx, "tcp", addr) + } + hold := holds[0] + d.holds[addr] = holds[1:] + d.mu.Unlock() + + logger.Infof("Hold %p: Intercepted connection attempt to addr %q", hold, addr) + close(hold.waitCh) select { - case <-d.blockCh: + case err := <-hold.blockCh: + if err != nil { + return nil, err + } + return (&net.Dialer{}).DialContext(ctx, "tcp", addr) case <-ctx.Done(): + logger.Infof("Hold %p: Connection attempt to addr %q timed out", hold, addr) return nil, ctx.Err() } - return d.dialer.DialContext(ctx, "tcp", addr) } -// Resume unblocks the dialer. It panics if called more than once. -func (d *BlockingDialer) Resume() { - close(d.blockCh) +// Hold is a handle to a single connection attempt. It can be used to block, +// fail and succeed connection attempts. +type Hold struct { + // dialer is the dialer that created this hold. + dialer *BlockingDialer + // waitCh is closed when a connection attempt is received. + waitCh chan struct{} + // blockCh receives the value to return from DialContext for this connection + // attempt (nil on resume, an error on fail). It receives at most 1 value. + blockCh chan error + // addr is the address that this hold is for. + addr string +} + +// Hold blocks the dialer when a connection attempt is made to the given addr. +// A hold is valid for exactly one connection attempt. Multiple holds for an +// addr can be added, and they will apply in the order that the connections are +// attempted. +func (d *BlockingDialer) Hold(addr string) *Hold { + d.mu.Lock() + defer d.mu.Unlock() + + h := Hold{dialer: d, blockCh: make(chan error, 1), waitCh: make(chan struct{}), addr: addr} + d.holds[addr] = append(d.holds[addr], &h) + return &h +} + +// Wait blocks until there is a connection attempt on this Hold, or the context +// expires. Return false if the context has expired, true otherwise. +func (h *Hold) Wait(ctx context.Context) bool { + logger.Infof("Hold %p: Waiting for a connection attempt to addr %q", h, h.addr) + select { + case <-ctx.Done(): + return false + case <-h.waitCh: + return true + } +} + +// Resume unblocks the dialer for the given addr. Either Resume or Fail must be +// called at most once on a hold. Otherwise, Resume panics. +func (h *Hold) Resume() { + logger.Infof("Hold %p: Resuming connection attempt to addr %q", h, h.addr) + h.blockCh <- nil + close(h.blockCh) +} + +// Fail fails the connection attempt. Either Resume or Fail must be +// called at most once on a hold. Otherwise, Resume panics. +func (h *Hold) Fail(err error) { + logger.Infof("Hold %p: Failing connection attempt to addr %q", h, h.addr) + h.blockCh <- err + close(h.blockCh) } diff --git a/internal/testutils/blocking_context_dialer_test.go b/internal/testutils/blocking_context_dialer_test.go new file mode 100644 index 000000000..d2b595414 --- /dev/null +++ b/internal/testutils/blocking_context_dialer_test.go @@ -0,0 +1,201 @@ +/* + * + * Copyright 2024 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package testutils + +import ( + "context" + "errors" + "testing" + "time" +) + +const ( + testTimeout = 5 * time.Second + testShortTimeout = 10 * time.Millisecond +) + +func (s) TestBlockingDialer_NoHold(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + + // This should not block. + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + conn, err := d.DialContext(ctx, lis.Addr().String()) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + conn.Close() +} + +func (s) TestBlockingDialer_HoldWaitResume(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + done := make(chan struct{}) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + go func() { + defer close(done) + conn, err := d.DialContext(ctx, lis.Addr().String()) + if err != nil { + t.Errorf("BlockingDialer.DialContext() got error: %v, want success", err) + return + } + conn.Close() + }() + + // This should block until the goroutine above is scheduled. + if !h.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to %q", h.addr) + } + select { + case <-done: + t.Fatalf("Expected dialer to be blocked.") + case <-time.After(testShortTimeout): + } + + h.Resume() // Unblock the above goroutine. + + select { + case <-done: + case <-ctx.Done(): + t.Errorf("Timeout waiting for connection attempt to resume.") + } +} + +func (s) TestBlockingDialer_HoldWaitFail(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + wantErr := errors.New("test error") + + dialError := make(chan error) + ctx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + go func() { + _, err := d.DialContext(ctx, lis.Addr().String()) + dialError <- err + }() + + if !h.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to " + h.addr) + } + select { + case err = <-dialError: + t.Errorf("DialContext got unblocked with err %v. Want DialContext to still be blocked after Wait()", err) + case <-time.After(testShortTimeout): + } + + h.Fail(wantErr) + + select { + case err = <-dialError: + if !errors.Is(err, wantErr) { + t.Errorf("BlockingDialer.DialContext() after Fail(): got error %v, want %v", err, wantErr) + } + case <-ctx.Done(): + t.Errorf("Timeout waiting for connection attempt to fail.") + } +} + +func (s) TestBlockingDialer_ContextCanceled(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + dialErr := make(chan error) + testCtx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctx, cancel := context.WithCancel(testCtx) + defer cancel() + go func() { + _, err := d.DialContext(ctx, lis.Addr().String()) + dialErr <- err + }() + if !h.Wait(testCtx) { + t.Errorf("Timeout while waiting for a connection attempt to %q", h.addr) + } + + cancel() + + select { + case err = <-dialErr: + if !errors.Is(err, context.Canceled) { + t.Errorf("BlockingDialer.DialContext() after context cancel: got error %v, want %v", err, context.Canceled) + } + case <-testCtx.Done(): + t.Errorf("Timeout while waiting for Wait to return.") + } + + h.Resume() // noop, just make sure nothing bad happen. +} + +func (s) TestBlockingDialer_CancelWait(t *testing.T) { + lis, err := LocalTCPListener() + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + defer lis.Close() + + d := NewBlockingDialer() + h := d.Hold(lis.Addr().String()) + + testCtx, cancel := context.WithTimeout(context.Background(), testTimeout) + defer cancel() + + ctx, cancel := context.WithCancel(testCtx) + cancel() + done := make(chan struct{}) + go func() { + if h.Wait(ctx) { + t.Errorf("Expected cancel to return false when context expires") + } + done <- struct{}{} + }() + + select { + case <-done: + case <-testCtx.Done(): + t.Errorf("Timeout while waiting for Wait to return.") + } +} diff --git a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go index 078313faf..df6b84fbf 100644 --- a/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go +++ b/xds/internal/balancer/ringhash/e2e/ringhash_balancer_test.go @@ -22,7 +22,9 @@ import ( "context" "fmt" "math" + "math/rand" "net" + "slices" "testing" "time" @@ -31,6 +33,7 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/backoff" + "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" "google.golang.org/grpc/internal" @@ -43,6 +46,7 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/status" v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -74,6 +78,15 @@ const ( virtualHostName = "test.server" ) +// fastConnectParams disables connection attempts backoffs and lowers delays. +// This speeds up tests that rely on subchannel to move to transient failure. +var fastConnectParams = grpc.ConnectParams{ + Backoff: backoff.Config{ + BaseDelay: 10 * time.Millisecond, + }, + MinConnectTimeout: 100 * time.Millisecond, +} + // Tests the case where the ring contains a single subConn, and verifies that // when the server goes down, the LB policy on the client automatically // reconnects until the subChannel moves out of TRANSIENT_FAILURE. @@ -99,6 +112,7 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) { grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(r), grpc.WithDefaultServiceConfig(ringHashServiceConfig), + grpc.WithConnectParams(fastConnectParams), } cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...) if err != nil { @@ -146,33 +160,28 @@ func (s) TestRingHash_ReconnectToMoveOutOfTransientFailure(t *testing.T) { } } -// startTestServiceBackends starts num stub servers. -func startTestServiceBackends(t *testing.T, num int) ([]*stubserver.StubServer, func()) { +// startTestServiceBackends starts num stub servers. It returns their addresses. +// Servers are closed when the test is stopped. +func startTestServiceBackends(t *testing.T, num int) []string { t.Helper() - var servers []*stubserver.StubServer + addrs := make([]string, 0, num) for i := 0; i < num; i++ { - servers = append(servers, stubserver.StartTestService(t, nil)) - } - - return servers, func() { - for _, server := range servers { - server.Stop() - } + server := stubserver.StartTestService(t, nil) + t.Cleanup(server.Stop) + addrs = append(addrs, server.Address) } + return addrs } -// backendOptions returns a slice of e2e.BackendOptions for the given stub -// servers. -func backendOptions(t *testing.T, servers []*stubserver.StubServer) []e2e.BackendOptions { +// backendOptions returns a slice of e2e.BackendOptions for the given server +// addresses. +func backendOptions(t *testing.T, serverAddrs []string) []e2e.BackendOptions { t.Helper() var backendOpts []e2e.BackendOptions - for _, server := range servers { - backendOpts = append(backendOpts, e2e.BackendOptions{ - Port: testutils.ParsePort(t, server.Address), - Weight: 1, - }) + for _, addr := range serverAddrs { + backendOpts = append(backendOpts, e2e.BackendOptions{Port: testutils.ParsePort(t, addr)}) } return backendOpts } @@ -210,10 +219,12 @@ func checkRPCSendOK(ctx context.Context, t *testing.T, client testgrpc.TestServi return backendCount } -// makeNonExistentBackends returns a slice of e2e.BackendOptions with num -// listeners, each of which is closed immediately. Useful to simulate servers -// that are unreachable. -func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions { +// makeNonExistentBackends returns a slice of strings with num listeners, each +// of which is closed immediately. Useful to simulate servers that are +// unreachable. +func makeNonExistentBackends(t *testing.T, num int) []string { + t.Helper() + closedListeners := make([]net.Listener, 0, num) for i := 0; i < num; i++ { lis, err := testutils.LocalTCPListener() @@ -224,16 +235,14 @@ func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions { } // Stop the servers that we want to be unreachable and collect their - // addresses. - backendOptions := make([]e2e.BackendOptions, 0, num) + // addresses. We don't close them in the loop above to make sure ports are + // not reused across them. + addrs := make([]string, 0, num) for _, lis := range closedListeners { - backendOptions = append(backendOptions, e2e.BackendOptions{ - Port: testutils.ParsePort(t, lis.Addr().String()), - Weight: 1, - }) + addrs = append(addrs, lis.Addr().String()) lis.Close() } - return backendOptions + return addrs } // setupManagementServerAndResolver sets up an xDS management server, creates @@ -244,6 +253,8 @@ func makeNonExistentBackends(t *testing.T, num int) []e2e.BackendOptions { // // Returns the management server, node ID and the xDS resolver builder. func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, string, resolver.Builder) { + t.Helper() + // Start an xDS management server. xdsServer := e2e.StartManagementServer(t, e2e.ManagementServerOptions{AllowResourceSubset: true}) @@ -264,14 +275,22 @@ func setupManagementServerAndResolver(t *testing.T) (*e2e.ManagementServer, stri return xdsServer, nodeID, r } +// xdsUpdateOpts returns an e2e.UpdateOptions for the given node ID with the given xDS resources. +func xdsUpdateOpts(nodeID string, endpoints *v3endpointpb.ClusterLoadAssignment, cluster *v3clusterpb.Cluster, route *v3routepb.RouteConfiguration, listener *v3listenerpb.Listener) e2e.UpdateOptions { + return e2e.UpdateOptions{ + NodeID: nodeID, + Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, + Clusters: []*v3clusterpb.Cluster{cluster}, + Routes: []*v3routepb.RouteConfiguration{route}, + Listeners: []*v3listenerpb.Listener{listener}, + } +} + // Tests that when an aggregate cluster is configured with ring hash policy, and // the first cluster is in transient failure, all RPCs are sent to the second // cluster using the ring hash policy. func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T) { - xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) - - servers, stop := startTestServiceBackends(t, 2) - defer stop() + addrs := startTestServiceBackends(t, 2) const primaryClusterName = "new_cluster_1" const primaryServiceName = "new_eds_service_1" @@ -284,7 +303,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: makeNonExistentBackends(t, 2), + Backends: backendOptions(t, makeNonExistentBackends(t, 2)), }}, }) ep2 := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ @@ -292,14 +311,14 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: backendOptions(t, servers), + Backends: backendOptions(t, addrs), }}, }) primaryCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: primaryClusterName, ServiceName: primaryServiceName, }) - secundaryCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + secondaryCluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: secondaryClusterName, ServiceName: secondaryServiceName, }) @@ -317,13 +336,15 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + updateOpts := e2e.UpdateOptions{ NodeID: nodeID, Endpoints: []*v3endpointpb.ClusterLoadAssignment{ep1, ep2}, - Clusters: []*v3clusterpb.Cluster{cluster, primaryCluster, secundaryCluster}, + Clusters: []*v3clusterpb.Cluster{cluster, primaryCluster, secondaryCluster}, Routes: []*v3routepb.RouteConfiguration{route}, Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + } + if err := xdsServer.Update(ctx, updateOpts); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } @@ -334,7 +355,8 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T defer conn.Close() client := testgrpc.NewTestServiceClient(conn) - gotPerBackend := checkRPCSendOK(ctx, t, client, 100) + const numRPCs = 100 + gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) // Since this is using ring hash with the channel ID as the key, all RPCs // are routed to the same backend of the secondary locality. @@ -346,17 +368,10 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashAtStartup(t *testing.T var got int for backend, got = range gotPerBackend { } - found := false - for _, server := range servers { - if backend == server.Address { - found = true - break - } + if !slices.Contains(addrs, backend) { + t.Errorf("Got RPCs routed to an unexpected backend: %v, want one of %v", backend, addrs) } - if !found { - t.Errorf("Got RPCs routed to an unexpected backend: %v, want one of %v", backend, servers) - } - if got != 100 { + if got != numRPCs { t.Errorf("Got %v RPCs routed to a backend, want %v", got, 100) } } @@ -376,21 +391,18 @@ func replaceDNSResolver(t *testing.T) *manual.Resolver { // logical DNS cluster, all RPCs are sent to the second cluster using the ring // hash policy. func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup(t *testing.T) { - xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) - const edsClusterName = "eds_cluster" const logicalDNSClusterName = "logical_dns_cluster" const clusterName = "aggregate_cluster" - backends, stop := startTestServiceBackends(t, 1) - defer stop() + backends := startTestServiceBackends(t, 1) endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: edsClusterName, Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: makeNonExistentBackends(t, 1), + Backends: backendOptions(t, makeNonExistentBackends(t, 1)), Priority: 0, }}, }) @@ -421,18 +433,20 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + updateOpts := e2e.UpdateOptions{ NodeID: nodeID, Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, Clusters: []*v3clusterpb.Cluster{cluster, edsCluster, logicalDNSCluster}, Routes: []*v3routepb.RouteConfiguration{route}, Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + } + if err := xdsServer.Update(ctx, updateOpts); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } dnsR := replaceDNSResolver(t) - dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0]}}}) conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { @@ -445,7 +459,7 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( var got string for got = range gotPerBackend { } - if want := backends[0].Address; got != want { + if want := backends[0]; got != want { t.Errorf("Got RPCs routed to an unexpected got: %v, want %v", got, want) } } @@ -454,22 +468,18 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartup( // it's first child is in transient failure, and the fallback is a logical DNS, // the later recovers from transient failure when its backend becomes available. func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupNoFailedRPCs(t *testing.T) { - // https://github.com/grpc/grpc/blob/083bbee4805c14ce62e6c9535fe936f68b854c4f/test/cpp/end2end/xds/xds_ring_hash_end2end_test.cc#L225 - xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) - const edsClusterName = "eds_cluster" const logicalDNSClusterName = "logical_dns_cluster" const clusterName = "aggregate_cluster" - backends, stop := startTestServiceBackends(t, 1) - defer stop() + backends := startTestServiceBackends(t, 1) endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ ClusterName: edsClusterName, Localities: []e2e.LocalityOptions{{ Name: "locality0", Weight: 1, - Backends: makeNonExistentBackends(t, 1), + Backends: backendOptions(t, makeNonExistentBackends(t, 1)), Priority: 0, }}, }) @@ -500,18 +510,20 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + updateOpts := e2e.UpdateOptions{ NodeID: nodeID, Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, Clusters: []*v3clusterpb.Cluster{cluster, edsCluster, logicalDNSCluster}, Routes: []*v3routepb.RouteConfiguration{route}, Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + } + if err := xdsServer.Update(ctx, updateOpts); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } dnsR := replaceDNSResolver(t) - dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}}) + dnsR.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0]}}}) dialer := testutils.NewBlockingDialer() cp := grpc.ConnectParams{ @@ -522,18 +534,20 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN }, MinConnectTimeout: 0, } - opts := []grpc.DialOption{ + dopts := []grpc.DialOption{ grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithContextDialer(dialer.DialContext), grpc.WithConnectParams(cp)} - conn, err := grpc.NewClient("xds:///test.server", opts...) + conn, err := grpc.NewClient("xds:///test.server", dopts...) if err != nil { t.Fatalf("Failed to create client: %s", err) } defer conn.Close() client := testgrpc.NewTestServiceClient(conn) + hold := dialer.Hold(backends[0]) + errCh := make(chan error, 2) go func() { if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil { @@ -562,15 +576,19 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN errCh <- nil }() + // Wait for a connection attempt to backends[0]. + if !hold.Wait(ctx) { + t.Fatalf("Timeout while waiting for a connection attempt to %s", backends[0]) + } // Allow the connection attempts to complete. - dialer.Resume() + hold.Resume() // RPCs should complete successfully. for range []int{0, 1} { select { case err := <-errCh: if err != nil { - t.Errorf("Expected 2 rpc to succeed, but failed: %v", err) + t.Errorf("Expected 2 rpc to succeed, but at least one failed: %v", err) } case <-ctx.Done(): t.Fatalf("Timed out waiting for RPCs to complete") @@ -578,22 +596,37 @@ func (s) TestRingHash_AggregateClusterFallBackFromRingHashToLogicalDnsAtStartupN } } +// endpointResource creates a ClusterLoadAssignment containing a single locality +// with the given addresses. +func endpointResource(t *testing.T, clusterName string, addrs []string) *v3endpointpb.ClusterLoadAssignment { + t.Helper() + + // We must set the host name socket address in EDS, as the ring hash policy + // uses it to construct the ring. + host, _, err := net.SplitHostPort(addrs[0]) + if err != nil { + t.Fatalf("Failed to split host and port from stubserver: %v", err) + } + + return e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Host: host, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, addrs), + Weight: 1, + }}, + }) +} + // Tests that ring hash policy that hashes using channel id ensures all RPCs to // go 1 particular backend. func (s) TestRingHash_ChannelIdHashing(t *testing.T) { - backends, stop := startTestServiceBackends(t, 4) - defer stop() + backends := startTestServiceBackends(t, 4) xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) const clusterName = "cluster" - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ - ClusterName: clusterName, - Localities: []e2e.LocalityOptions{{ - Backends: backendOptions(t, backends), - Weight: 1, - }}, - }) + endpoints := endpointResource(t, clusterName, backends) cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: clusterName, @@ -605,13 +638,7 @@ func (s) TestRingHash_ChannelIdHashing(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, - Clusters: []*v3clusterpb.Cluster{cluster}, - Routes: []*v3routepb.RouteConfiguration{route}, - Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } @@ -622,15 +649,16 @@ func (s) TestRingHash_ChannelIdHashing(t *testing.T) { defer conn.Close() client := testgrpc.NewTestServiceClient(conn) - received := checkRPCSendOK(ctx, t, client, 100) + const numRPCs = 100 + received := checkRPCSendOK(ctx, t, client, numRPCs) if len(received) != 1 { t.Errorf("Got RPCs routed to %v backends, want %v", len(received), 1) } - var count int - for _, count = range received { + var got int + for _, got = range received { } - if count != 100 { - t.Errorf("Got %v RPCs routed to a backend, want %v", count, 100) + if got != numRPCs { + t.Errorf("Got %v RPCs routed to a backend, want %v", got, numRPCs) } } @@ -650,30 +678,15 @@ func headerHashRoute(routeName, virtualHostName, clusterName, header string) *v3 return route } -// Tests that ring hash policy that hashes using a header value can spread RPCs -// across all the backends. +// Tests that ring hash policy that hashes using a header value can send RPCs +// to specific backends based on their hash. func (s) TestRingHash_HeaderHashing(t *testing.T) { - backends, stop := startTestServiceBackends(t, 4) - defer stop() - - // We must set the host name socket address in EDS, as the ring hash policy - // uses it to construct the ring. - host, _, err := net.SplitHostPort(backends[0].Address) - if err != nil { - t.Fatalf("Failed to split host and port from stubserver: %v", err) - } + backends := startTestServiceBackends(t, 4) xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) const clusterName = "cluster" - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ - ClusterName: clusterName, - Host: host, - Localities: []e2e.LocalityOptions{{ - Backends: backendOptions(t, backends), - Weight: 1, - }}, - }) + endpoints := endpointResource(t, clusterName, backends) cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: clusterName, @@ -685,13 +698,7 @@ func (s) TestRingHash_HeaderHashing(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, - Clusters: []*v3clusterpb.Cluster{cluster}, - Routes: []*v3routepb.RouteConfiguration{route}, - Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } @@ -706,10 +713,11 @@ func (s) TestRingHash_HeaderHashing(t *testing.T) { // to a specific backend as the header value matches the value used to // create the entry in the ring. for _, backend := range backends { - ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend.Address+"_0")) - reqPerBackend := checkRPCSendOK(ctx, t, client, 1) - if reqPerBackend[backend.Address] != 1 { - t.Errorf("Got RPC routed to backend %v, want %v", reqPerBackend, backend.Address) + ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend+"_0")) + numRPCs := 10 + reqPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) + if reqPerBackend[backend] != numRPCs { + t.Errorf("Got RPC routed to addresses %v, want all RPCs routed to %v", reqPerBackend, backend) } } } @@ -717,27 +725,10 @@ func (s) TestRingHash_HeaderHashing(t *testing.T) { // Tests that ring hash policy that hashes using a header value and regex // rewrite to aggregate RPCs to 1 backend. func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { - backends, stop := startTestServiceBackends(t, 4) - defer stop() - - // We must set the host name socket address in EDS, as the ring hash policy - // uses it to construct the ring. - host, _, err := net.SplitHostPort(backends[0].Address) - if err != nil { - t.Fatalf("Failed to split host and port from stubserver: %v", err) - } - - xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + backends := startTestServiceBackends(t, 4) clusterName := "cluster" - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ - ClusterName: clusterName, - Host: host, - Localities: []e2e.LocalityOptions{{ - Backends: backendOptions(t, backends), - Weight: 1, - }}, - }) + endpoints := endpointResource(t, clusterName, backends) cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: clusterName, @@ -757,13 +748,8 @@ func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, - Clusters: []*v3clusterpb.Cluster{cluster}, - Routes: []*v3routepb.RouteConfiguration{route}, - Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } @@ -781,7 +767,7 @@ func (s) TestRingHash_HeaderHashingWithRegexRewrite(t *testing.T) { // hashing to the same value. gotPerBackend := make(map[string]int) for _, backend := range backends { - ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend.Address+"_0")) + ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", backend+"_0")) res := checkRPCSendOK(ctx, t, client, 100) for addr, count := range res { gotPerBackend[addr] += count @@ -816,7 +802,7 @@ func computeIdealNumberOfRPCs(t *testing.T, p, errorTolerance float64) int { // minimum ring size to ensure that the ring is large enough to distribute // requests more uniformly across endpoints when a random hash is used. func setRingHashLBPolicyWithHighMinRingSize(t *testing.T, cluster *v3clusterpb.Cluster) { - minRingSize := uint64(100000) + const minRingSize = 100000 oldVal := envconfig.RingHashCap envconfig.RingHashCap = minRingSize t.Cleanup(func() { @@ -839,20 +825,11 @@ func setRingHashLBPolicyWithHighMinRingSize(t *testing.T, cluster *v3clusterpb.C // Tests that ring hash policy that hashes using a random value. func (s) TestRingHash_NoHashPolicy(t *testing.T) { - backends, stop := startTestServiceBackends(t, 2) - defer stop() + backends := startTestServiceBackends(t, 2) numRPCs := computeIdealNumberOfRPCs(t, .5, errorTolerance) - xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) - const clusterName = "cluster" - endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ - ClusterName: clusterName, - Localities: []e2e.LocalityOptions{{ - Backends: backendOptions(t, backends), - Weight: 1, - }}, - }) + endpoints := endpointResource(t, clusterName, backends) cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ ClusterName: clusterName, ServiceName: clusterName, @@ -864,13 +841,8 @@ func (s) TestRingHash_NoHashPolicy(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, - Clusters: []*v3clusterpb.Cluster{cluster}, - Routes: []*v3routepb.RouteConfiguration{route}, - Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } @@ -884,26 +856,23 @@ func (s) TestRingHash_NoHashPolicy(t *testing.T) { // Send a large number of RPCs and check that they are distributed randomly. gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) for _, backend := range backends { - got := float64(gotPerBackend[backend.Address]) / float64(numRPCs) + got := float64(gotPerBackend[backend]) / float64(numRPCs) want := .5 if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backend, got, want, errorTolerance) } } } // Tests that we observe endpoint weights. func (s) TestRingHash_EndpointWeights(t *testing.T) { - backends, stop := startTestServiceBackends(t, 3) - defer stop() - - xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + backends := startTestServiceBackends(t, 3) const clusterName = "cluster" backendOpts := []e2e.BackendOptions{ - {Port: testutils.ParsePort(t, backends[0].Address)}, - {Port: testutils.ParsePort(t, backends[1].Address)}, - {Port: testutils.ParsePort(t, backends[2].Address), Weight: 2}, + {Port: testutils.ParsePort(t, backends[0])}, + {Port: testutils.ParsePort(t, backends[1])}, + {Port: testutils.ParsePort(t, backends[2]), Weight: 2}, } endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ @@ -928,13 +897,8 @@ func (s) TestRingHash_EndpointWeights(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - if err := xdsServer.Update(ctx, e2e.UpdateOptions{ - NodeID: nodeID, - Endpoints: []*v3endpointpb.ClusterLoadAssignment{endpoints}, - Clusters: []*v3clusterpb.Cluster{cluster}, - Routes: []*v3routepb.RouteConfiguration{route}, - Listeners: []*v3listenerpb.Listener{listener}, - }); err != nil { + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { t.Fatalf("Failed to update xDS resources: %v", err) } @@ -949,18 +913,656 @@ func (s) TestRingHash_EndpointWeights(t *testing.T) { numRPCs := computeIdealNumberOfRPCs(t, .25, errorTolerance) gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) - got := float64(gotPerBackend[backends[0].Address]) / float64(numRPCs) + got := float64(gotPerBackend[backends[0]]) / float64(numRPCs) want := .25 if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[0].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[0], got, want, errorTolerance) } - got = float64(gotPerBackend[backends[1].Address]) / float64(numRPCs) + got = float64(gotPerBackend[backends[1]]) / float64(numRPCs) if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[1].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[1], got, want, errorTolerance) } - got = float64(gotPerBackend[backends[2].Address]) / float64(numRPCs) + got = float64(gotPerBackend[backends[2]]) / float64(numRPCs) want = .50 if !cmp.Equal(got, want, cmpopts.EquateApprox(0, errorTolerance)) { - t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2].Address, got, want, errorTolerance) + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, want, errorTolerance) + } +} + +// Tests that ring hash policy evaluation will continue past the terminal hash +// policy if no results are produced yet. +func (s) TestRingHash_ContinuesPastTerminalPolicyThatDoesNotProduceResult(t *testing.T) { + backends := startTestServiceBackends(t, 2) + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + + // Even though this hash policy is terminal, since it produces no result, we + // continue past it to find a policy that produces results. + hashPolicy := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "header_not_present", + }, + }, + Terminal: true, + } + hashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "address_hash", + }, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&hashPolicy, &hashPolicy2} + + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // - The first hash policy does not match because the header is not present. + // If this hash policy was applied, it would spread the load across + // backend 0 and 1, since a random hash would be used. + // - In the second hash policy, each type of RPC contains a header + // value that always hashes to backend 0, as the header value + // matches the value used to create the entry in the ring. + // We verify that the second hash policy is used by checking that all RPCs + // are being routed to backend 0. + wantBackend := backends[0] + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", wantBackend+"_0")) + const numRPCs = 100 + gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) + if got := gotPerBackend[wantBackend]; got != numRPCs { + t.Errorf("Got %v RPCs routed to backend %v, want %v", got, wantBackend, numRPCs) + } +} + +// Tests that a random hash is used when header hashing policy specified a +// header field that the RPC did not have. +func (s) TestRingHash_HashOnHeaderThatIsNotPresent(t *testing.T) { + backends := startTestServiceBackends(t, 2) + wantFractionPerBackend := .5 + numRPCs := computeIdealNumberOfRPCs(t, wantFractionPerBackend, errorTolerance) + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := headerHashRoute("new_route", virtualHostName, clusterName, "header_not_present") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // The first hash policy does not apply because the header is not present in + // the RPCs that we are about to send. As a result, a random hash should be + // used instead, resulting in a random request distribution. + // We verify this by checking that the RPCs are distributed randomly. + gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) + for _, backend := range backends { + got := float64(gotPerBackend[backend]) / float64(numRPCs) + if !cmp.Equal(got, wantFractionPerBackend, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backend, got, wantFractionPerBackend, errorTolerance) + } + } +} + +// Tests that a random hash is used when only unsupported hash policies are +// configured. +func (s) TestRingHash_UnsupportedHashPolicyDefaultToRandomHashing(t *testing.T) { + backends := startTestServiceBackends(t, 2) + wantFractionPerBackend := .5 + numRPCs := computeIdealNumberOfRPCs(t, wantFractionPerBackend, errorTolerance) + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + unsupportedHashPolicy1 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Cookie_{ + Cookie: &v3routepb.RouteAction_HashPolicy_Cookie{Name: "cookie"}, + }, + } + unsupportedHashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{ + ConnectionProperties: &v3routepb.RouteAction_HashPolicy_ConnectionProperties{SourceIp: true}, + }, + } + unsupportedHashPolicy3 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_QueryParameter_{ + QueryParameter: &v3routepb.RouteAction_HashPolicy_QueryParameter{Name: "query_parameter"}, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&unsupportedHashPolicy1, &unsupportedHashPolicy2, &unsupportedHashPolicy3} + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Since none of the hash policy are supported, a random hash should be + // generated for every request. + // We verify this by checking that the RPCs are distributed randomly. + gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) + for _, backend := range backends { + got := float64(gotPerBackend[backend]) / float64(numRPCs) + if !cmp.Equal(got, wantFractionPerBackend, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backend, got, wantFractionPerBackend, errorTolerance) + } + } +} + +// Tests that unsupported hash policy types are all ignored before a supported +// hash policy. +func (s) TestRingHash_UnsupportedHashPolicyUntilChannelIdHashing(t *testing.T) { + backends := startTestServiceBackends(t, 2) + + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + unsupportedHashPolicy1 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Cookie_{ + Cookie: &v3routepb.RouteAction_HashPolicy_Cookie{Name: "cookie"}, + }, + } + unsupportedHashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_ConnectionProperties_{ + ConnectionProperties: &v3routepb.RouteAction_HashPolicy_ConnectionProperties{SourceIp: true}, + }, + } + unsupportedHashPolicy3 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_QueryParameter_{ + QueryParameter: &v3routepb.RouteAction_HashPolicy_QueryParameter{Name: "query_parameter"}, + }, + } + channelIDhashPolicy := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_FilterState_{ + FilterState: &v3routepb.RouteAction_HashPolicy_FilterState{ + Key: "io.grpc.channel_id", + }, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&unsupportedHashPolicy1, &unsupportedHashPolicy2, &unsupportedHashPolicy3, &channelIDhashPolicy} + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Since only unsupported policies are present except for the last one + // which is using the channel ID hashing policy, all requests should be + // routed to the same backend. + const numRPCs = 100 + gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) + if len(gotPerBackend) != 1 { + t.Errorf("Got RPCs routed to %v backends, want 1", len(gotPerBackend)) + } + var got int + for _, got = range gotPerBackend { + } + if got != numRPCs { + t.Errorf("Got %v RPCs routed to a backend, want %v", got, numRPCs) + } +} + +// Tests that ring hash policy that hashes using a random value can spread RPCs +// across all the backends according to locality weight. +func (s) TestRingHash_RandomHashingDistributionAccordingToLocalityAndEndpointWeight(t *testing.T) { + backends := startTestServiceBackends(t, 2) + + const clusterName = "cluster" + const locality1Weight = uint32(1) + const endpoint1Weight = uint32(1) + const locality2Weight = uint32(2) + const endpoint2Weight = uint32(2) + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{ + { + Backends: []e2e.BackendOptions{{ + Port: testutils.ParsePort(t, backends[0]), + Weight: endpoint1Weight, + }}, + Weight: locality1Weight, + }, + { + Backends: []e2e.BackendOptions{{ + Port: testutils.ParsePort(t, backends[1]), + Weight: endpoint2Weight, + }}, + Weight: locality2Weight, + }, + }, + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + }) + setRingHashLBPolicyWithHighMinRingSize(t, cluster) + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + const weight1 = endpoint1Weight * locality1Weight + const weight2 = endpoint2Weight * locality2Weight + const wantRPCs1 = float64(weight1) / float64(weight1+weight2) + const wantRPCs2 = float64(weight2) / float64(weight1+weight2) + numRPCs := computeIdealNumberOfRPCs(t, math.Min(wantRPCs1, wantRPCs2), errorTolerance) + + // Send a large number of RPCs and check that they are distributed randomly. + gotPerBackend := checkRPCSendOK(ctx, t, client, numRPCs) + got := float64(gotPerBackend[backends[0]]) / float64(numRPCs) + if !cmp.Equal(got, wantRPCs1, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, wantRPCs1, errorTolerance) + } + got = float64(gotPerBackend[backends[1]]) / float64(numRPCs) + if !cmp.Equal(got, wantRPCs2, cmpopts.EquateApprox(0, errorTolerance)) { + t.Errorf("Fraction of RPCs to backend %s: got %v, want %v (margin: +-%v)", backends[2], got, wantRPCs2, errorTolerance) + } +} + +// Tests that ring hash policy that hashes using a fixed string ensures all RPCs +// to go 1 particular backend; and that subsequent hashing policies are ignored +// due to the setting of terminal. +func (s) TestRingHash_FixedHashingTerminalPolicy(t *testing.T) { + backends := startTestServiceBackends(t, 2) + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + + route := e2e.DefaultRouteConfig("new_route", "test.server", clusterName) + + hashPolicy := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "fixed_string", + }, + }, + Terminal: true, + } + hashPolicy2 := v3routepb.RouteAction_HashPolicy{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: "random_string", + }, + }, + } + action := route.VirtualHosts[0].Routes[0].Action.(*v3routepb.Route_Route) + action.Route.HashPolicy = []*v3routepb.RouteAction_HashPolicy{&hashPolicy, &hashPolicy2} + + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Check that despite the matching random string header, since the fixed + // string hash policy is terminal, only the fixed string hash policy applies + // and requests all get routed to the same host. + gotPerBackend := make(map[string]int) + const numRPCs = 100 + for i := 0; i < numRPCs; i++ { + ctx := metadata.NewOutgoingContext(ctx, metadata.Pairs( + "fixed_string", backends[0]+"_0", + "random_string", fmt.Sprintf("%d", rand.Int())), + ) + var remote peer.Peer + _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.Peer(&remote)) + if err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } + gotPerBackend[remote.Addr.String()]++ + } + + if len(gotPerBackend) != 1 { + t.Error("Got RPCs routed to multiple backends, want a single backend") + } + if got := gotPerBackend[backends[0]]; got != numRPCs { + t.Errorf("Got %v RPCs routed to %v, want %v", got, backends[0], numRPCs) + } +} + +// TestRingHash_IdleToReady tests that the channel will go from idle to ready +// via connecting; (though it is not possible to catch the connecting state +// before moving to ready via the public API). +// TODO: we should be able to catch all state transitions by using the internal.SubscribeToConnectivityStateChanges API. +func (s) TestRingHash_IdleToReady(t *testing.T) { + backends := startTestServiceBackends(t, 1) + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, backends) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := channelIDHashRoute("new_route", virtualHostName, clusterName) + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + checkRPCSendOK(ctx, t, client, 1) + + if got, want := conn.GetState(), connectivity.Ready; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } +} + +// Test that the channel will transition to READY once it starts +// connecting even if there are no RPCs being sent to the picker. +func (s) TestRingHash_ContinuesConnectingWithoutPicks(t *testing.T) { + backend := stubserver.StartTestService(t, &stubserver.StubServer{ + // We expect the server EmptyCall to not be call here because the + // aggregated channel state is never READY when the call is pending. + EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) { + t.Errorf("EmptyCall() should not have been called") + return &testpb.Empty{}, nil + }, + }) + defer backend.Stop() + + nonExistantServerAddr := makeNonExistentBackends(t, 1)[0] + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, []string{backend.Address, nonExistantServerAddr}) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + dialer := testutils.NewBlockingDialer() + dopts := []grpc.DialOption{ + grpc.WithResolvers(xdsResolver), + grpc.WithTransportCredentials(insecure.NewCredentials()), + grpc.WithContextDialer(dialer.DialContext), + } + conn, err := grpc.NewClient("xds:///test.server", dopts...) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + hold := dialer.Hold(backend.Address) + + rpcCtx, rpcCancel := context.WithCancel(ctx) + go func() { + rpcCtx = metadata.NewOutgoingContext(rpcCtx, metadata.Pairs("address_hash", nonExistantServerAddr+"_0")) + _, err := client.EmptyCall(rpcCtx, &testpb.Empty{}) + if status.Code(err) != codes.Canceled { + t.Errorf("Expected RPC to be canceled, got error: %v", err) + } + }() + + // Wait for the connection attempt to the real backend. + if !hold.Wait(ctx) { + t.Fatalf("Timeout waiting for connection attempt to backend %v.", backend.Address) + } + // Now cancel the RPC while we are still connecting. + rpcCancel() + + // This allows the connection attempts to continue. The RPC was cancelled + // before the backend was connected, but the backend is up. The conn + // becomes Ready due to the connection attempt to the existing backend + // succeeding, despite no new RPC being sent. + hold.Resume() + + testutils.AwaitState(ctx, t, conn, connectivity.Ready) +} + +// Tests that when the first pick is down leading to a transient failure, we +// will move on to the next ring hash entry. +func (s) TestRingHash_TransientFailureCheckNextOne(t *testing.T) { + backends := startTestServiceBackends(t, 1) + nonExistentBackends := makeNonExistentBackends(t, 1) + + const clusterName = "cluster" + endpoints := endpointResource(t, clusterName, append(nonExistentBackends, backends...)) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := headerHashRoute("new_route", virtualHostName, clusterName, "address_hash") + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + // Note each type of RPC contains a header value that will always be hashed + // the value that was used to place the non-existent endpoint on the ring, + // but it still gets routed to the backend that is up. + ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("address_hash", nonExistentBackends[0]+"_0")) + reqPerBackend := checkRPCSendOK(ctx, t, client, 1) + var got string + for got = range reqPerBackend { + } + if want := backends[0]; got != want { + t.Errorf("Got RPC routed to addr %v, want %v", got, want) + } +} + +// Tests for a bug seen in the wild in c-core, where ring_hash started with no +// endpoints and reported TRANSIENT_FAILURE, then got an update with endpoints +// and reported IDLE, but the picker update was squelched, so it failed to ever +// get reconnected. +func (s) TestRingHash_ReattemptWhenGoingFromTransientFailureToIdle(t *testing.T) { + const clusterName = "cluster" + endpoints := e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{}}, // note the empty locality (no endpoint). + }) + cluster := e2e.ClusterResourceWithOptions(e2e.ClusterOptions{ + ClusterName: clusterName, + ServiceName: clusterName, + Policy: e2e.LoadBalancingPolicyRingHash, + }) + route := e2e.DefaultRouteConfig("new_route", virtualHostName, clusterName) + listener := e2e.DefaultClientListener(virtualHostName, route.Name) + + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + + xdsServer, nodeID, xdsResolver := setupManagementServerAndResolver(t) + if err := xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + conn, err := grpc.NewClient("xds:///test.server", grpc.WithResolvers(xdsResolver), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("Failed to create client: %s", err) + } + defer conn.Close() + client := testgrpc.NewTestServiceClient(conn) + + if got, want := conn.GetState(), connectivity.Idle; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + // There are no endpoints in EDS. RPCs should fail and the channel should + // transition to transient failure. + if _, err = client.EmptyCall(ctx, &testpb.Empty{}); err == nil { + t.Errorf("rpc EmptyCall() succeeded, want error") + } + if got, want := conn.GetState(), connectivity.TransientFailure; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) + } + + backends := startTestServiceBackends(t, 1) + + t.Log("Updating EDS with a new backend endpoint.") + endpoints = e2e.EndpointResourceWithOptions(e2e.EndpointOptions{ + ClusterName: clusterName, + Localities: []e2e.LocalityOptions{{ + Backends: backendOptions(t, backends), + Weight: 1, + }}, + }) + if err = xdsServer.Update(ctx, xdsUpdateOpts(nodeID, endpoints, cluster, route, listener)); err != nil { + t.Fatalf("Failed to update xDS resources: %v", err) + } + + // A WaitForReady RPC should succeed, and the channel should report READY. + if _, err = client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Errorf("rpc EmptyCall() failed: %v", err) + } + if got, want := conn.GetState(), connectivity.Ready; got != want { + t.Errorf("conn.GetState(): got %v, want %v", got, want) } }