mirror of https://github.com/grpc/grpc-go.git
grpc: do not use balancer attributes during address comparison (#6439)
This commit is contained in:
parent
db32c5bfeb
commit
8e9c8f8e71
|
@ -112,19 +112,31 @@ func (a *Attributes) String() string {
|
|||
sb.WriteString("{")
|
||||
first := true
|
||||
for k, v := range a.m {
|
||||
var key, val string
|
||||
if str, ok := k.(interface{ String() string }); ok {
|
||||
key = str.String()
|
||||
}
|
||||
if str, ok := v.(interface{ String() string }); ok {
|
||||
val = str.String()
|
||||
}
|
||||
if !first {
|
||||
sb.WriteString(", ")
|
||||
}
|
||||
sb.WriteString(fmt.Sprintf("%q: %q, ", key, val))
|
||||
sb.WriteString(fmt.Sprintf("%q: %q ", str(k), str(v)))
|
||||
first = false
|
||||
}
|
||||
sb.WriteString("}")
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
func str(x interface{}) string {
|
||||
if v, ok := x.(fmt.Stringer); ok {
|
||||
return v.String()
|
||||
} else if v, ok := x.(string); ok {
|
||||
return v
|
||||
}
|
||||
return fmt.Sprintf("<%p>", x)
|
||||
}
|
||||
|
||||
// MarshalJSON helps implement the json.Marshaler interface, thereby rendering
|
||||
// the Attributes correctly when printing (via pretty.JSON) structs containing
|
||||
// Attributes as fields.
|
||||
//
|
||||
// Is it impossible to unmarshal attributes from a JSON representation and this
|
||||
// method is meant only for debugging purposes.
|
||||
func (a *Attributes) MarshalJSON() ([]byte, error) {
|
||||
return []byte(a.String()), nil
|
||||
}
|
||||
|
|
|
@ -37,6 +37,7 @@ import (
|
|||
"google.golang.org/grpc/internal/backoff"
|
||||
"google.golang.org/grpc/internal/channelz"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/pretty"
|
||||
iresolver "google.golang.org/grpc/internal/resolver"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
|
@ -867,6 +868,20 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
|
|||
cc.balancerWrapper.updateSubConnState(sc, s, err)
|
||||
}
|
||||
|
||||
// Makes a copy of the input addresses slice and clears out the balancer
|
||||
// attributes field. Addresses are passed during subconn creation and address
|
||||
// update operations. In both cases, we will clear the balancer attributes by
|
||||
// calling this function, and therefore we will be able to use the Equal method
|
||||
// provided by the resolver.Address type for comparison.
|
||||
func copyAddressesWithoutBalancerAttributes(in []resolver.Address) []resolver.Address {
|
||||
out := make([]resolver.Address, len(in))
|
||||
for i := range in {
|
||||
out[i] = in[i]
|
||||
out[i].BalancerAttributes = nil
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// newAddrConn creates an addrConn for addrs and adds it to cc.conns.
|
||||
//
|
||||
// Caller needs to make sure len(addrs) > 0.
|
||||
|
@ -874,7 +889,7 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSub
|
|||
ac := &addrConn{
|
||||
state: connectivity.Idle,
|
||||
cc: cc,
|
||||
addrs: addrs,
|
||||
addrs: copyAddressesWithoutBalancerAttributes(addrs),
|
||||
scopts: opts,
|
||||
dopts: cc.dopts,
|
||||
czData: new(channelzData),
|
||||
|
@ -995,8 +1010,9 @@ func equalAddresses(a, b []resolver.Address) bool {
|
|||
// connections or connection attempts.
|
||||
func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
|
||||
ac.mu.Lock()
|
||||
channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", ac.curAddr, addrs)
|
||||
channelz.Infof(logger, ac.channelzID, "addrConn: updateAddrs curAddr: %v, addrs: %v", pretty.ToJSON(ac.curAddr), pretty.ToJSON(addrs))
|
||||
|
||||
addrs = copyAddressesWithoutBalancerAttributes(addrs)
|
||||
if equalAddresses(ac.addrs, addrs) {
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
|
|
|
@ -60,6 +60,10 @@ type StubServer struct {
|
|||
Address string
|
||||
Target string
|
||||
|
||||
// Custom listener to use for serving. If unspecified, a new listener is
|
||||
// created on a local port.
|
||||
Listener net.Listener
|
||||
|
||||
cleanups []func() // Lambdas executed in Stop(); populated by Start().
|
||||
|
||||
// Set automatically if Target == ""
|
||||
|
@ -118,9 +122,13 @@ func (ss *StubServer) StartServer(sopts ...grpc.ServerOption) error {
|
|||
ss.R = manual.NewBuilderWithScheme("whatever")
|
||||
}
|
||||
|
||||
lis, err := net.Listen(ss.Network, ss.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
|
||||
lis := ss.Listener
|
||||
if lis == nil {
|
||||
var err error
|
||||
lis, err = net.Listen(ss.Network, ss.Address)
|
||||
if err != nil {
|
||||
return fmt.Errorf("net.Listen(%q, %q) = %v", ss.Network, ss.Address, err)
|
||||
}
|
||||
}
|
||||
ss.Address = lis.Addr().String()
|
||||
ss.cleanups = append(ss.cleanups, func() { lis.Close() })
|
||||
|
|
|
@ -142,6 +142,10 @@ type Address struct {
|
|||
|
||||
// Equal returns whether a and o are identical. Metadata is compared directly,
|
||||
// not with any recursive introspection.
|
||||
//
|
||||
// This method compares all fields of the address. When used to tell apart
|
||||
// addresses during subchannel creation or connection establishment, it might be
|
||||
// more appropriate for the caller to implement custom equality logic.
|
||||
func (a Address) Equal(o Address) bool {
|
||||
return a.Addr == o.Addr && a.ServerName == o.ServerName &&
|
||||
a.Attributes.Equal(o.Attributes) &&
|
||||
|
|
|
@ -20,7 +20,7 @@ package test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -299,6 +299,11 @@ func (s) TestPickFirst_NewAddressWhileBlocking(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestPickFirst_StickyTransientFailure tests the case where pick_first is
|
||||
// configured on a channel, and the backend is configured to close incoming
|
||||
// connections as soon as they are accepted. The test verifies that the channel
|
||||
// enters TransientFailure and stays there. The test also verifies that the
|
||||
// pick_first LB policy is constantly trying to reconnect to the backend.
|
||||
func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
|
||||
// Spin up a local server which closes the connection as soon as it receives
|
||||
// one. It also sends a signal on a channel whenver it received a connection.
|
||||
|
@ -346,40 +351,27 @@ func (s) TestPickFirst_StickyTransientFailure(t *testing.T) {
|
|||
}
|
||||
t.Cleanup(func() { cc.Close() })
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
// Spin up a goroutine that waits for the channel to move to
|
||||
// TransientFailure. After that it checks that the channel stays in
|
||||
// TransientFailure, until Shutdown.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for state := cc.GetState(); state != connectivity.TransientFailure; state = cc.GetState() {
|
||||
if !cc.WaitForStateChange(ctx, state) {
|
||||
t.Errorf("Timeout when waiting for state to change to TransientFailure. Current state is %s", state)
|
||||
return
|
||||
}
|
||||
}
|
||||
awaitState(ctx, t, cc, connectivity.TransientFailure)
|
||||
|
||||
// TODO(easwars): this waits for 10s. Need shorter deadline here. Basically once the second goroutine exits, we should exit from here too.
|
||||
// Spawn a goroutine to ensure that the channel stays in TransientFailure.
|
||||
// The call to cc.WaitForStateChange will return false when the main
|
||||
// goroutine exits and the context is cancelled.
|
||||
go func() {
|
||||
if cc.WaitForStateChange(ctx, connectivity.TransientFailure) {
|
||||
if state := cc.GetState(); state != connectivity.Shutdown {
|
||||
t.Errorf("Unexpected state change from TransientFailure to %s", cc.GetState())
|
||||
}
|
||||
}
|
||||
}()
|
||||
// Spin up a goroutine which ensures that the pick_first LB policy is
|
||||
// constantly trying to reconnect.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 10; i++ {
|
||||
select {
|
||||
case <-connCh:
|
||||
case <-time.After(2 * defaultTestShortTimeout):
|
||||
t.Error("Timeout when waiting for pick_first to reconnect")
|
||||
}
|
||||
|
||||
// Ensures that the pick_first LB policy is constantly trying to reconnect.
|
||||
for i := 0; i < 10; i++ {
|
||||
select {
|
||||
case <-connCh:
|
||||
case <-time.After(2 * defaultTestShortTimeout):
|
||||
t.Error("Timeout when waiting for pick_first to reconnect")
|
||||
}
|
||||
}()
|
||||
wg.Wait()
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the PF LB policy with shuffling enabled.
|
||||
|
@ -475,3 +467,221 @@ func (s) TestPickFirst_ShuffleAddressListDisabled(t *testing.T) {
|
|||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// setupPickFirstWithListenerWrapper is very similar to setupPickFirst, but uses
|
||||
// a wrapped listener that the test can use to track accepted connections.
|
||||
func setupPickFirstWithListenerWrapper(t *testing.T, backendCount int, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer, []*testutils.ListenerWrapper) {
|
||||
t.Helper()
|
||||
|
||||
// Initialize channelz. Used to determine pending RPC count.
|
||||
czCleanup := channelz.NewChannelzStorageForTesting()
|
||||
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
|
||||
|
||||
backends := make([]*stubserver.StubServer, backendCount)
|
||||
addrs := make([]resolver.Address, backendCount)
|
||||
listeners := make([]*testutils.ListenerWrapper, backendCount)
|
||||
for i := 0; i < backendCount; i++ {
|
||||
lis := testutils.NewListenerWrapper(t, nil)
|
||||
backend := &stubserver.StubServer{
|
||||
Listener: lis,
|
||||
EmptyCallF: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||
return &testpb.Empty{}, nil
|
||||
},
|
||||
}
|
||||
if err := backend.StartServer(); err != nil {
|
||||
t.Fatalf("Failed to start backend: %v", err)
|
||||
}
|
||||
t.Logf("Started TestService backend at: %q", backend.Address)
|
||||
t.Cleanup(func() { backend.Stop() })
|
||||
|
||||
backends[i] = backend
|
||||
addrs[i] = resolver.Address{Addr: backend.Address}
|
||||
listeners[i] = lis
|
||||
}
|
||||
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
dopts := []grpc.DialOption{
|
||||
grpc.WithTransportCredentials(insecure.NewCredentials()),
|
||||
grpc.WithResolvers(r),
|
||||
grpc.WithDefaultServiceConfig(pickFirstServiceConfig),
|
||||
}
|
||||
dopts = append(dopts, opts...)
|
||||
cc, err := grpc.Dial(r.Scheme()+":///test.server", dopts...)
|
||||
if err != nil {
|
||||
t.Fatalf("grpc.Dial() failed: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { cc.Close() })
|
||||
|
||||
// At this point, the resolver has not returned any addresses to the channel.
|
||||
// This RPC must block until the context expires.
|
||||
sCtx, sCancel := context.WithTimeout(context.Background(), defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
client := testgrpc.NewTestServiceClient(cc)
|
||||
if _, err := client.EmptyCall(sCtx, &testpb.Empty{}); status.Code(err) != codes.DeadlineExceeded {
|
||||
t.Fatalf("EmptyCall() = %s, want %s", status.Code(err), codes.DeadlineExceeded)
|
||||
}
|
||||
return cc, r, backends, listeners
|
||||
}
|
||||
|
||||
// TestPickFirst_AddressUpdateWithAttributes tests the case where an address
|
||||
// update received by the pick_first LB policy differs in attributes. Addresses
|
||||
// which differ in attributes are considered different from the perspective of
|
||||
// subconn creation and connection establishment and the test verifies that new
|
||||
// connections are created when attributes change.
|
||||
func (s) TestPickFirst_AddressUpdateWithAttributes(t *testing.T) {
|
||||
cc, r, backends, listeners := setupPickFirstWithListenerWrapper(t, 2)
|
||||
|
||||
// Add a set of attributes to the addresses before pushing them to the
|
||||
// pick_first LB policy through the manual resolver.
|
||||
addrs := stubBackendsToResolverAddrs(backends)
|
||||
for i := range addrs {
|
||||
addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-1", fmt.Sprintf("%d", i))
|
||||
}
|
||||
r.UpdateState(resolver.State{Addresses: addrs})
|
||||
|
||||
// Ensure that RPCs succeed to the first backend in the list.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Grab the wrapped connection from the listener wrapper. This will be used
|
||||
// to verify the connection is closed.
|
||||
val, err := listeners[0].NewConnCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
|
||||
}
|
||||
conn := val.(*testutils.ConnWrapper)
|
||||
|
||||
// Add another set of attributes to the addresses, and push them to the
|
||||
// pick_first LB policy through the manual resolver. Leave the order of the
|
||||
// addresses unchanged.
|
||||
for i := range addrs {
|
||||
addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-2", fmt.Sprintf("%d", i))
|
||||
}
|
||||
r.UpdateState(resolver.State{Addresses: addrs})
|
||||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// A change in the address attributes results in the new address being
|
||||
// considered different to the current address. This will result in the old
|
||||
// connection being closed and a new connection to the same backend (since
|
||||
// address order is not modified).
|
||||
if _, err := conn.CloseCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
|
||||
}
|
||||
val, err = listeners[0].NewConnCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
|
||||
}
|
||||
conn = val.(*testutils.ConnWrapper)
|
||||
|
||||
// Add another set of attributes to the addresses, and push them to the
|
||||
// pick_first LB policy through the manual resolver. Reverse of the order
|
||||
// of addresses.
|
||||
for i := range addrs {
|
||||
addrs[i].Attributes = addrs[i].Attributes.WithValue("test-attribute-3", fmt.Sprintf("%d", i))
|
||||
}
|
||||
addrs[0], addrs[1] = addrs[1], addrs[0]
|
||||
r.UpdateState(resolver.State{Addresses: addrs})
|
||||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Ensure that the old connection is closed and a new connection is
|
||||
// established to the first address in the new list.
|
||||
if _, err := conn.CloseCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Timeout when expecting existing connection to be closed: %v", err)
|
||||
}
|
||||
_, err = listeners[1].NewConnCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// TestPickFirst_AddressUpdateWithBalancerAttributes tests the case where an
|
||||
// address update received by the pick_first LB policy differs in balancer
|
||||
// attributes, which are meant only for consumption by LB policies. In this
|
||||
// case, the test verifies that new connections are not created when the address
|
||||
// update only changes the balancer attributes.
|
||||
func (s) TestPickFirst_AddressUpdateWithBalancerAttributes(t *testing.T) {
|
||||
cc, r, backends, listeners := setupPickFirstWithListenerWrapper(t, 2)
|
||||
|
||||
// Add a set of balancer attributes to the addresses before pushing them to
|
||||
// the pick_first LB policy through the manual resolver.
|
||||
addrs := stubBackendsToResolverAddrs(backends)
|
||||
for i := range addrs {
|
||||
addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-1", fmt.Sprintf("%d", i))
|
||||
}
|
||||
r.UpdateState(resolver.State{Addresses: addrs})
|
||||
|
||||
// Ensure that RPCs succeed to the expected backend.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Grab the wrapped connection from the listener wrapper. This will be used
|
||||
// to verify the connection is not closed.
|
||||
val, err := listeners[0].NewConnCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to receive new connection from wrapped listener: %v", err)
|
||||
}
|
||||
conn := val.(*testutils.ConnWrapper)
|
||||
|
||||
// Add a set of balancer attributes to the addresses before pushing them to
|
||||
// the pick_first LB policy through the manual resolver. Leave the order of
|
||||
// the addresses unchanged.
|
||||
for i := range addrs {
|
||||
addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-2", fmt.Sprintf("%d", i))
|
||||
}
|
||||
r.UpdateState(resolver.State{Addresses: addrs})
|
||||
|
||||
// Ensure that no new connection is established, and ensure that the old
|
||||
// connection is not closed.
|
||||
for i := range listeners {
|
||||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := listeners[i].NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
|
||||
t.Fatalf("Unexpected error when expecting no new connection: %v", err)
|
||||
}
|
||||
}
|
||||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded {
|
||||
t.Fatalf("Unexpected error when expecting existing connection to stay active: %v", err)
|
||||
}
|
||||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[0]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Add a set of balancer attributes to the addresses before pushing them to
|
||||
// the pick_first LB policy through the manual resolver. Reverse of the
|
||||
// order of addresses.
|
||||
for i := range addrs {
|
||||
addrs[i].BalancerAttributes = addrs[i].BalancerAttributes.WithValue("test-attribute-3", fmt.Sprintf("%d", i))
|
||||
}
|
||||
addrs[0], addrs[1] = addrs[1], addrs[0]
|
||||
r.UpdateState(resolver.State{Addresses: addrs})
|
||||
|
||||
// Ensure that no new connection is established, and ensure that the old
|
||||
// connection is not closed.
|
||||
for i := range listeners {
|
||||
sCtx, sCancel := context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := listeners[i].NewConnCh.Receive(sCtx); err != context.DeadlineExceeded {
|
||||
t.Fatalf("Unexpected error when expecting no new connection: %v", err)
|
||||
}
|
||||
}
|
||||
sCtx, sCancel = context.WithTimeout(ctx, defaultTestShortTimeout)
|
||||
defer sCancel()
|
||||
if _, err := conn.CloseCh.Receive(sCtx); err != context.DeadlineExceeded {
|
||||
t.Fatalf("Unexpected error when expecting existing connection to stay active: %v", err)
|
||||
}
|
||||
if err := pickfirst.CheckRPCsToBackend(ctx, cc, addrs[1]); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue