From c949703b4b98fb798b026b4f28b1ff42b3460e8d Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Fri, 26 Feb 2021 08:45:26 -0800 Subject: [PATCH] xds/cdsbalancer: Override UpdateAddresses() (#4227) --- .../balancer/cdsbalancer/cdsbalancer.go | 18 ++++-- .../cdsbalancer/cdsbalancer_security_test.go | 61 +++++++++++++------ xds/internal/testutils/balancer.go | 22 ++++--- 3 files changed, 71 insertions(+), 30 deletions(-) diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer.go b/xds/internal/balancer/cdsbalancer/cdsbalancer.go index 04286d715..e4d349753 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer.go @@ -496,10 +496,12 @@ func (b *cdsBalancer) Close() { b.xdsClient.Close() } -// ccWrapper wraps the balancer.ClientConn that was passed in to the CDS -// balancer during creation and intercepts the NewSubConn() call from the child -// policy. Other methods of the balancer.ClientConn interface are not overridden -// and hence get the original implementation. +// ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at +// creation and intercepts the NewSubConn() and UpdateAddresses() call from the +// child policy to add security configuration required by xDS credentials. +// +// Other methods of the balancer.ClientConn interface are not overridden and +// hence get the original implementation. type ccWrapper struct { balancer.ClientConn @@ -518,3 +520,11 @@ func (ccw *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubC } return ccw.ClientConn.NewSubConn(newAddrs, opts) } + +func (ccw *ccWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + newAddrs := make([]resolver.Address, len(addrs)) + for i, addr := range addrs { + newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHI) + } + ccw.ClientConn.UpdateAddresses(sc, newAddrs) +} diff --git a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go index e862f99cd..fee48c262 100644 --- a/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go +++ b/xds/internal/balancer/cdsbalancer/cdsbalancer_security_test.go @@ -171,34 +171,35 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS // makeNewSubConn invokes the NewSubConn() call on the balancer.ClientConn // passed to the EDS balancer, and verifies that the CDS balancer forwards the // call appropriately to its parent balancer.ClientConn with or without -// attributes bases on the value of wantAttributes. -func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *xdstestutils.TestClientConn, wantFallback bool) error { +// attributes bases on the value of wantFallback. +func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *xdstestutils.TestClientConn, wantFallback bool) (balancer.SubConn, error) { dummyAddr := "foo-address" addrs := []resolver.Address{{Addr: dummyAddr}} - if _, err := edsCC.NewSubConn(addrs, balancer.NewSubConnOptions{}); err != nil { - return fmt.Errorf("NewSubConn(%+v) on parent ClientConn failed: %v", addrs, err) + sc, err := edsCC.NewSubConn(addrs, balancer.NewSubConnOptions{}) + if err != nil { + return nil, fmt.Errorf("NewSubConn(%+v) on parent ClientConn failed: %v", addrs, err) } select { case <-ctx.Done(): - return errors.New("timeout when waiting for new SubConn") + return nil, errors.New("timeout when waiting for new SubConn") case gotAddrs := <-parentCC.NewSubConnAddrsCh: if len(gotAddrs) != 1 { - return fmt.Errorf("NewSubConn expected 1 address, got %d", len(gotAddrs)) + return nil, fmt.Errorf("NewSubConn expected 1 address, got %d", len(gotAddrs)) } if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want { - return fmt.Errorf("resolver.Address passed to parent ClientConn has address %q, want %q", got, want) + return nil, fmt.Errorf("resolver.Address passed to parent ClientConn has address %q, want %q", got, want) } getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdsinternal.HandshakeInfo) hi := getHI(gotAddrs[0].Attributes) if hi == nil { - return errors.New("resolver.Address passed to parent ClientConn doesn't contain attributes") + return nil, errors.New("resolver.Address passed to parent ClientConn doesn't contain attributes") } if gotFallback := hi.UseFallbackCreds(); gotFallback != wantFallback { - return fmt.Errorf("resolver.Address HandshakeInfo uses fallback creds? %v, want %v", gotFallback, wantFallback) + return nil, fmt.Errorf("resolver.Address HandshakeInfo uses fallback creds? %v, want %v", gotFallback, wantFallback) } } - return nil + return sc, nil } // TestSecurityConfigWithoutXDSCreds tests the case where xdsCredentials are not @@ -243,7 +244,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) { // Make a NewSubConn and verify that the HandshakeInfo does not contain any // certificate providers, forcing the credentials implementation to use // fallback creds. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { t.Fatal(err) } @@ -299,7 +300,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) { // Make a NewSubConn and verify that the HandshakeInfo does not contain any // certificate providers, forcing the credentials implementation to use // fallback creds. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { t.Fatal(err) } @@ -451,7 +452,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) { } // Make a NewSubConn and verify that attributes are added. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { t.Fatal(err) } } @@ -487,9 +488,31 @@ func (s) TestGoodSecurityConfig(t *testing.T) { } // Make a NewSubConn and verify that attributes are added. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { + sc, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false) + if err != nil { t.Fatal(err) } + + // Invoke UpdateAddresses and verify that attributes are added. + dummyAddr := "bar-address" + addrs := []resolver.Address{{Addr: dummyAddr}} + edsB.parentCC.UpdateAddresses(sc, addrs) + select { + case <-ctx.Done(): + t.Fatal("timeout when waiting for addresses to be updated on the subConn") + case gotAddrs := <-tcc.UpdateAddressesAddrsCh: + if len(gotAddrs) != 1 { + t.Fatalf("UpdateAddresses expected 1 address, got %d", len(gotAddrs)) + } + if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want { + t.Fatalf("resolver.Address passed to parent ClientConn through UpdateAddresses() has address %q, want %q", got, want) + } + getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdsinternal.HandshakeInfo) + hi := getHI(gotAddrs[0].Attributes) + if hi == nil { + t.Fatal("resolver.Address passed to parent ClientConn through UpdateAddresses() doesn't contain attributes") + } + } } func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { @@ -518,7 +541,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { } // Make a NewSubConn and verify that attributes are added. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { t.Fatal(err) } @@ -532,7 +555,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) { } // Make a NewSubConn and verify that fallback creds are used. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil { t.Fatal(err) } } @@ -568,7 +591,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) { } // Make a NewSubConn and verify that attributes are added. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { t.Fatal(err) } @@ -650,7 +673,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) { } // Make a NewSubConn and verify that attributes are added. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { t.Fatal(err) } @@ -671,7 +694,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) { } // Make a NewSubConn and verify that attributes are added. - if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { + if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil { t.Fatal(err) } diff --git a/xds/internal/testutils/balancer.go b/xds/internal/testutils/balancer.go index 4973cbc96..dab84a84e 100644 --- a/xds/internal/testutils/balancer.go +++ b/xds/internal/testutils/balancer.go @@ -71,9 +71,10 @@ func (tsc *TestSubConn) String() string { type TestClientConn struct { logger testingLogger - NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn. - NewSubConnCh chan balancer.SubConn // the last 10 subconn created. - RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed. + NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn. + NewSubConnCh chan balancer.SubConn // the last 10 subconn created. + RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed. + UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses(). NewPickerCh chan balancer.Picker // the last picker updated. NewStateCh chan connectivity.State // the last state. @@ -86,9 +87,10 @@ func NewTestClientConn(t *testing.T) *TestClientConn { return &TestClientConn{ logger: t, - NewSubConnAddrsCh: make(chan []resolver.Address, 10), - NewSubConnCh: make(chan balancer.SubConn, 10), - RemoveSubConnCh: make(chan balancer.SubConn, 10), + NewSubConnAddrsCh: make(chan []resolver.Address, 10), + NewSubConnCh: make(chan balancer.SubConn, 10), + RemoveSubConnCh: make(chan balancer.SubConn, 10), + UpdateAddressesAddrsCh: make(chan []resolver.Address, 1), NewPickerCh: make(chan balancer.Picker, 1), NewStateCh: make(chan connectivity.State, 1), @@ -124,7 +126,13 @@ func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) { } // UpdateAddresses updates the addresses on the SubConn. -func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {} +func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) { + tcc.logger.Logf("testClientConn: UpdateAddresses(%v, %+v)", sc, addrs) + select { + case tcc.UpdateAddressesAddrsCh <- addrs: + default: + } +} // UpdateState updates connectivity state and picker. func (tcc *TestClientConn) UpdateState(bs balancer.State) {