From 4a0125ac580883d82f77d6fdc836b5966d79b439 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 30 Nov 2020 14:20:03 -0800 Subject: [PATCH] roundrobin: strip attributes from addresses (#4024) --- attributes/attributes_test.go | 12 ++++ balancer/base/balancer.go | 32 +++++++++-- balancer/base/balancer_test.go | 70 ++++++++++++++++++++++ balancer/roundrobin/roundrobin_test.go | 80 +++++++++++++++++++++++++- xds/internal/testutils/balancer.go | 4 +- 5 files changed, 189 insertions(+), 9 deletions(-) create mode 100644 balancer/base/balancer_test.go diff --git a/attributes/attributes_test.go b/attributes/attributes_test.go index 4cca17b55..1174e2371 100644 --- a/attributes/attributes_test.go +++ b/attributes/attributes_test.go @@ -20,6 +20,8 @@ package attributes_test import ( "fmt" + "reflect" + "testing" "google.golang.org/grpc/attributes" ) @@ -46,3 +48,13 @@ func ExampleAttributes_WithValues() { // Key one: 1 // Key two: two } + +// Test that two attributes with the same content are `reflect.DeepEqual`. +func TestDeepEqual(t *testing.T) { + type keyOne struct{} + a1 := attributes.New(keyOne{}, 1) + a2 := attributes.New(keyOne{}, 1) + if !reflect.DeepEqual(a1, a2) { + t.Fatalf("reflect.DeepEqual(%+v, %+v), want true, got false", a1, a2) + } +} diff --git a/balancer/base/balancer.go b/balancer/base/balancer.go index 32d782f1c..e0d34288c 100644 --- a/balancer/base/balancer.go +++ b/balancer/base/balancer.go @@ -64,7 +64,7 @@ type baseBalancer struct { csEvltr *balancer.ConnectivityStateEvaluator state connectivity.State - subConns map[resolver.Address]balancer.SubConn + subConns map[resolver.Address]balancer.SubConn // `attributes` is stripped from the keys of this map (the addresses) scStates map[balancer.SubConn]connectivity.State picker balancer.Picker config Config @@ -101,17 +101,41 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error { // addrsSet is the set converted from addrs, it's used for quick lookup of an address. addrsSet := make(map[resolver.Address]struct{}) for _, a := range s.ResolverState.Addresses { - addrsSet[a] = struct{}{} - if _, ok := b.subConns[a]; !ok { + // Strip attributes from addresses before using them as map keys. So + // that when two addresses only differ in attributes pointers (but with + // the same attribute content), they are considered the same address. + // + // Note that this doesn't handle the case where the attribute content is + // different. So if users want to set different attributes to create + // duplicate connections to the same backend, it doesn't work. This is + // fine for now, because duplicate is done by setting Metadata today. + // + // TODO: read attributes to handle duplicate connections. + aNoAttrs := a + aNoAttrs.Attributes = nil + addrsSet[aNoAttrs] = struct{}{} + if sc, ok := b.subConns[aNoAttrs]; !ok { // a is a new address (not existing in b.subConns). + // + // When creating SubConn, the original address with attributes is + // passed through. So that connection configurations in attributes + // (like creds) will be used. sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: b.config.HealthCheck}) if err != nil { logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) continue } - b.subConns[a] = sc + b.subConns[aNoAttrs] = sc b.scStates[sc] = connectivity.Idle sc.Connect() + } else { + // Always update the subconn's address in case the attributes + // changed. + // + // The SubConn does a reflect.DeepEqual of the new and old + // addresses. So this is a noop if the current address is the same + // as the old one (including attributes). + sc.UpdateAddresses([]resolver.Address{a}) } } for a, sc := range b.subConns { diff --git a/balancer/base/balancer_test.go b/balancer/base/balancer_test.go new file mode 100644 index 000000000..03114251a --- /dev/null +++ b/balancer/base/balancer_test.go @@ -0,0 +1,70 @@ +/* + * + * Copyright 2020 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package base + +import ( + "testing" + + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/resolver" +) + +type testClientConn struct { + balancer.ClientConn + newSubConn func([]resolver.Address, balancer.NewSubConnOptions) (balancer.SubConn, error) +} + +func (c *testClientConn) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) { + return c.newSubConn(addrs, opts) +} + +type testSubConn struct{} + +func (sc *testSubConn) UpdateAddresses(addresses []resolver.Address) {} + +func (sc *testSubConn) Connect() {} + +func TestBaseBalancerStripAttributes(t *testing.T) { + b := (&baseBuilder{}).Build(&testClientConn{ + newSubConn: func(addrs []resolver.Address, _ balancer.NewSubConnOptions) (balancer.SubConn, error) { + for _, addr := range addrs { + if addr.Attributes == nil { + t.Errorf("in NewSubConn, got address %+v with nil attributes, want not nil", addr) + } + } + return &testSubConn{}, nil + }, + }, balancer.BuildOptions{}).(*baseBalancer) + + b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{ + Addresses: []resolver.Address{ + {Addr: "1.1.1.1", Attributes: &attributes.Attributes{}}, + {Addr: "2.2.2.2", Attributes: &attributes.Attributes{}}, + }, + }, + }) + + for addr := range b.subConns { + if addr.Attributes != nil { + t.Errorf("in b.subConns, got address %+v with nil attributes, want not nil", addr) + } + } +} diff --git a/balancer/roundrobin/roundrobin_test.go b/balancer/roundrobin/roundrobin_test.go index 5a8ba481c..b89cdb4a3 100644 --- a/balancer/roundrobin/roundrobin_test.go +++ b/balancer/roundrobin/roundrobin_test.go @@ -32,6 +32,8 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/internal/grpctest" + imetadata "google.golang.org/grpc/internal/metadata" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" @@ -39,6 +41,10 @@ import ( testpb "google.golang.org/grpc/test/grpc_testing" ) +const ( + testMDKey = "test-md" +) + type s struct { grpctest.Tester } @@ -49,9 +55,23 @@ func Test(t *testing.T) { type testServer struct { testpb.UnimplementedTestServiceServer + + testMDChan chan []string +} + +func newTestServer() *testServer { + return &testServer{testMDChan: make(chan []string, 1)} } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + md, ok := metadata.FromIncomingContext(ctx) + if ok && len(md[testMDKey]) != 0 { + select { + case s.testMDChan <- md[testMDKey]: + case <-ctx.Done(): + return nil, ctx.Err() + } + } return &testpb.Empty{}, nil } @@ -60,8 +80,9 @@ func (s *testServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServ } type test struct { - servers []*grpc.Server - addresses []string + servers []*grpc.Server + serverImpls []*testServer + addresses []string } func (t *test) cleanup() { @@ -85,8 +106,10 @@ func startTestServers(count int) (_ *test, err error) { } s := grpc.NewServer() - testpb.RegisterTestServiceServer(s, &testServer{}) + sImpl := newTestServer() + testpb.RegisterTestServiceServer(s, sImpl) t.servers = append(t.servers, s) + t.serverImpls = append(t.serverImpls, sImpl) t.addresses = append(t.addresses, lis.Addr().String()) go func(s *grpc.Server, l net.Listener) { @@ -473,3 +496,54 @@ func (s) TestAllServersDown(t *testing.T) { } t.Fatalf("Failfast RPCs didn't fail with Unavailable after all servers are stopped") } + +func (s) TestUpdateAddressAttributes(t *testing.T) { + r := manual.NewBuilderWithScheme("whatever") + + test, err := startTestServers(1) + if err != nil { + t.Fatalf("failed to start servers: %v", err) + } + defer test.cleanup() + + cc, err := grpc.Dial(r.Scheme()+":///test.server", grpc.WithInsecure(), grpc.WithResolvers(r), grpc.WithBalancerName(roundrobin.Name)) + if err != nil { + t.Fatalf("failed to dial: %v", err) + } + defer cc.Close() + testc := testpb.NewTestServiceClient(cc) + // The first RPC should fail because there's no address. + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) + defer cancel() + if _, err := testc.EmptyCall(ctx, &testpb.Empty{}); err == nil || status.Code(err) != codes.DeadlineExceeded { + t.Fatalf("EmptyCall() = _, %v, want _, DeadlineExceeded", err) + } + + r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: test.addresses[0]}}}) + // The second RPC should succeed. + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + // The second RPC should not set metadata, so there's no md in the channel. + select { + case md1 := <-test.serverImpls[0].testMDChan: + t.Fatalf("got md: %v, want empty metadata", md1) + case <-time.After(time.Microsecond * 100): + } + + const testMDValue = "test-md-value" + // Update metadata in address. + r.UpdateState(resolver.State{Addresses: []resolver.Address{ + imetadata.Set(resolver.Address{Addr: test.addresses[0]}, metadata.Pairs(testMDKey, testMDValue)), + }}) + // The third RPC should succeed. + if _, err := testc.EmptyCall(context.Background(), &testpb.Empty{}); err != nil { + t.Fatalf("EmptyCall() = _, %v, want _, ", err) + } + + // The third RPC should send metadata with it. + md2 := <-test.serverImpls[0].testMDChan + if len(md2) == 0 || md2[0] != testMDValue { + t.Fatalf("got md: %v, want %v", md2, []string{testMDValue}) + } +} diff --git a/xds/internal/testutils/balancer.go b/xds/internal/testutils/balancer.go index cd81b8fa4..6811e415a 100644 --- a/xds/internal/testutils/balancer.go +++ b/xds/internal/testutils/balancer.go @@ -56,8 +56,8 @@ type TestSubConn struct { id string } -// UpdateAddresses panics. -func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) { panic("not implemented") } +// UpdateAddresses is a no-op. +func (tsc *TestSubConn) UpdateAddresses([]resolver.Address) {} // Connect is a no-op. func (tsc *TestSubConn) Connect() {}