mirror of https://github.com/grpc/grpc-go.git
319 lines
12 KiB
Go
319 lines
12 KiB
Go
/*
|
|
*
|
|
* Copyright 2022 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package test
|
|
|
|
import (
|
|
"context"
|
|
"strings"
|
|
"testing"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/internal/channelz"
|
|
imetadata "google.golang.org/grpc/internal/metadata"
|
|
"google.golang.org/grpc/internal/stubserver"
|
|
"google.golang.org/grpc/internal/testutils"
|
|
rrutil "google.golang.org/grpc/internal/testutils/roundrobin"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/resolver"
|
|
"google.golang.org/grpc/resolver/manual"
|
|
"google.golang.org/grpc/status"
|
|
|
|
testgrpc "google.golang.org/grpc/interop/grpc_testing"
|
|
testpb "google.golang.org/grpc/interop/grpc_testing"
|
|
)
|
|
|
|
const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
|
|
|
|
func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
|
|
t.Helper()
|
|
|
|
r := manual.NewBuilderWithScheme("whatever")
|
|
|
|
const backendCount = 5
|
|
backends := make([]*stubserver.StubServer, backendCount)
|
|
endpoints := make([]resolver.Endpoint, backendCount)
|
|
addrs := make([]resolver.Address, backendCount)
|
|
for i := 0; i < backendCount; i++ {
|
|
backend := &stubserver.StubServer{
|
|
EmptyCallF: func(context.Context, *testpb.Empty) (*testpb.Empty, error) { return &testpb.Empty{}, nil },
|
|
}
|
|
if err := backend.StartServer(); err != nil {
|
|
t.Fatalf("Failed to start backend: %v", err)
|
|
}
|
|
t.Logf("Started TestService backend at: %q", backend.Address)
|
|
t.Cleanup(func() { backend.Stop() })
|
|
|
|
backends[i] = backend
|
|
addrs[i] = resolver.Address{Addr: backend.Address}
|
|
endpoints[i] = resolver.Endpoint{Addresses: []resolver.Address{addrs[i]}}
|
|
}
|
|
|
|
dopts := []grpc.DialOption{
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithResolvers(r),
|
|
grpc.WithDefaultServiceConfig(rrServiceConfig),
|
|
}
|
|
dopts = append(dopts, opts...)
|
|
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
|
|
if err != nil {
|
|
t.Fatalf("grpc.NewClient() failed: %v", err)
|
|
}
|
|
t.Cleanup(func() { cc.Close() })
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
|
|
// At this point, the resolver has not returned any addresses to the channel.
|
|
// This RPC must block until the context expires.
|
|
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
|
defer sCancel()
|
|
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
|
|
t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
|
|
}
|
|
|
|
r.UpdateState(resolver.State{Addresses: addrs})
|
|
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
return cc, r, backends
|
|
}
|
|
|
|
// TestRoundRobin_Basic tests the most basic scenario for round_robin. It brings
|
|
// up a bunch of backends and verifies that RPCs are getting round robin-ed
|
|
// across these backends.
|
|
func (s) TestRoundRobin_Basic(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
testRoundRobinBasic(ctx, t)
|
|
}
|
|
|
|
// TestRoundRobin_AddressesRemoved tests the scenario where a bunch of backends
|
|
// are brought up, and round_robin is configured as the LB policy and RPCs are
|
|
// being correctly round robin-ed across these backends. We then send a resolver
|
|
// update with no addresses and verify that the channel enters TransientFailure
|
|
// and RPCs fail with an expected error message.
|
|
func (s) TestRoundRobin_AddressesRemoved(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
cc, r, _ := testRoundRobinBasic(ctx, t)
|
|
|
|
// Send a resolver update with no addresses. This should push the channel into
|
|
// TransientFailure.
|
|
r.UpdateState(resolver.State{Endpoints: []resolver.Endpoint{}})
|
|
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
|
|
|
|
const msgWant = "no children to pick from"
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); !strings.Contains(status.Convert(err).Message(), msgWant) {
|
|
t.Fatalf("EmptyCall() = %v, want Contains(Message(), %q)", err, msgWant)
|
|
}
|
|
}
|
|
|
|
// TestRoundRobin_NewAddressWhileBlocking tests the case where round_robin is
|
|
// configured on a channel, things are working as expected and then a resolver
|
|
// updates removes all addresses. An RPC attempted at this point in time will be
|
|
// blocked because there are no valid ¡ds. This test verifies that when new
|
|
// backends are added, the RPC is able to complete.
|
|
func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
cc, r, backends := testRoundRobinBasic(ctx, t)
|
|
|
|
// Send a resolver update with no addresses. This should push the channel into
|
|
// TransientFailure.
|
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{}})
|
|
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
|
|
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
doneCh := make(chan struct{})
|
|
go func() {
|
|
// The channel is currently in TransientFailure and this RPC will block
|
|
// until the channel becomes Ready, which will only happen when we push a
|
|
// resolver update with a valid backend address.
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
|
t.Errorf("EmptyCall() = %v, want <nil>", err)
|
|
}
|
|
close(doneCh)
|
|
}()
|
|
|
|
// Make sure that there is one pending RPC on the ClientConn before attempting
|
|
// to push new addresses through the name resolver. If we don't do this, the
|
|
// resolver update can happen before the above goroutine gets to make the RPC.
|
|
for {
|
|
if err := ctx.Err(); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
tcs, _ := channelz.GetTopChannels(0, 0)
|
|
if len(tcs) != 1 {
|
|
t.Fatalf("there should only be one top channel, not %d", len(tcs))
|
|
}
|
|
started := tcs[0].ChannelMetrics.CallsStarted.Load()
|
|
completed := tcs[0].ChannelMetrics.CallsSucceeded.Load() + tcs[0].ChannelMetrics.CallsFailed.Load()
|
|
if (started - completed) == 1 {
|
|
break
|
|
}
|
|
time.Sleep(defaultTestShortTimeout)
|
|
}
|
|
|
|
// Send a resolver update with a valid backend to push the channel to Ready
|
|
// and unblock the above RPC.
|
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatal("Timeout when waiting for blocked RPC to complete")
|
|
case <-doneCh:
|
|
}
|
|
}
|
|
|
|
// TestRoundRobin_OneServerDown tests the scenario where a channel is configured
|
|
// to round robin across a set of backends, and things are working correctly.
|
|
// One backend goes down. The test verifies that going forward, RPCs are round
|
|
// robin-ed across the remaining set of backends.
|
|
func (s) TestRoundRobin_OneServerDown(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
cc, _, backends := testRoundRobinBasic(ctx, t)
|
|
|
|
// Stop one backend. RPCs should round robin across the remaining backends.
|
|
backends[len(backends)-1].Stop()
|
|
|
|
addrs := make([]resolver.Address, len(backends)-1)
|
|
for i := 0; i < len(backends)-1; i++ {
|
|
addrs[i] = resolver.Address{Addr: backends[i].Address}
|
|
}
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
if err := rrutil.CheckRoundRobinRPCs(ctx, client, addrs); err != nil {
|
|
t.Fatalf("RPCs are not being round robined across remaining servers: %v", err)
|
|
}
|
|
}
|
|
|
|
// TestRoundRobin_AllServersDown tests the scenario where a channel is
|
|
// configured to round robin across a set of backends, and things are working
|
|
// correctly. Then, all backends go down. The test verifies that the channel
|
|
// moves to TransientFailure and failfast RPCs fail with Unavailable.
|
|
func (s) TestRoundRobin_AllServersDown(t *testing.T) {
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
cc, _, backends := testRoundRobinBasic(ctx, t)
|
|
|
|
// Stop all backends.
|
|
for _, b := range backends {
|
|
b.Stop()
|
|
}
|
|
|
|
testutils.AwaitState(ctx, t, cc, connectivity.TransientFailure)
|
|
|
|
// Failfast RPCs should fail with Unavailable.
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable {
|
|
t.Fatalf("EmptyCall got err: %v; want Unavailable", err)
|
|
}
|
|
}
|
|
|
|
// TestRoundRobin_UpdateAddressAttributes tests the scenario where the addresses
|
|
// returned by the resolver contain attributes. The test verifies that the
|
|
// attributes contained in the addresses show up as RPC metadata in the backend.
|
|
func (s) TestRoundRobin_UpdateAddressAttributes(t *testing.T) {
|
|
const (
|
|
testMDKey = "test-md"
|
|
testMDValue = "test-md-value"
|
|
)
|
|
r := manual.NewBuilderWithScheme("whatever")
|
|
|
|
// Spin up a StubServer to serve as a backend. The implementation verifies
|
|
// that the expected metadata is received.
|
|
testMDChan := make(chan []string, 1)
|
|
backend := &stubserver.StubServer{
|
|
EmptyCallF: func(ctx context.Context, _ *testpb.Empty) (*testpb.Empty, error) {
|
|
md, ok := metadata.FromIncomingContext(ctx)
|
|
if ok {
|
|
select {
|
|
case testMDChan <- md[testMDKey]:
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
return &testpb.Empty{}, nil
|
|
},
|
|
}
|
|
if err := backend.StartServer(); err != nil {
|
|
t.Fatalf("Failed to start backend: %v", err)
|
|
}
|
|
t.Logf("Started TestService backend at: %q", backend.Address)
|
|
t.Cleanup(func() { backend.Stop() })
|
|
|
|
// Dial the backend with round_robin as the LB policy.
|
|
dopts := []grpc.DialOption{
|
|
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
|
grpc.WithResolvers(r),
|
|
grpc.WithDefaultServiceConfig(rrServiceConfig),
|
|
}
|
|
// Set an initial resolver update with no address attributes.
|
|
addr := resolver.Address{Addr: backend.Address}
|
|
r.InitialState(resolver.State{Addresses: []resolver.Address{addr}})
|
|
cc, err := grpc.NewClient(r.Scheme()+":///test.server", dopts...)
|
|
if err != nil {
|
|
t.Fatalf("grpc.NewClient() failed: %v", err)
|
|
}
|
|
t.Cleanup(func() { cc.Close() })
|
|
|
|
// Make an RPC and ensure it does not contain the metadata we are looking for.
|
|
client := testgrpc.NewTestServiceClient(cc)
|
|
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
|
defer cancel()
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
|
t.Fatalf("EmptyCall() = %v, want <nil>", err)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("Timeout when waiting for metadata received in RPC")
|
|
case md := <-testMDChan:
|
|
if len(md) != 0 {
|
|
t.Fatalf("received metadata %v, want nil", md)
|
|
}
|
|
}
|
|
|
|
// Send a resolver update with address attributes.
|
|
addrWithAttributes := imetadata.Set(addr, metadata.Pairs(testMDKey, testMDValue))
|
|
r.UpdateState(resolver.State{Addresses: []resolver.Address{addrWithAttributes}})
|
|
|
|
// Make an RPC and ensure it contains the metadata we are looking for. The
|
|
// resolver update isn't processed synchronously, so we wait some time before
|
|
// failing if some RPCs do not contain it.
|
|
Done:
|
|
for {
|
|
if _, err := client.EmptyCall(ctx, &testpb.Empty{}); err != nil {
|
|
t.Fatalf("EmptyCall() = %v, want <nil>", err)
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
t.Fatalf("Timeout when waiting for metadata received in RPC")
|
|
case md := <-testMDChan:
|
|
if len(md) == 1 && md[0] == testMDValue {
|
|
break Done
|
|
}
|
|
}
|
|
time.Sleep(defaultTestShortTimeout)
|
|
}
|
|
}
|