mirror of https://github.com/grpc/grpc-go.git
xds/cdsbalancer: Override UpdateAddresses() (#4227)
This commit is contained in:
parent
9dfe677337
commit
c949703b4b
|
|
@ -496,10 +496,12 @@ func (b *cdsBalancer) Close() {
|
||||||
b.xdsClient.Close()
|
b.xdsClient.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
// ccWrapper wraps the balancer.ClientConn that was passed in to the CDS
|
// ccWrapper wraps the balancer.ClientConn passed to the CDS balancer at
|
||||||
// balancer during creation and intercepts the NewSubConn() call from the child
|
// creation and intercepts the NewSubConn() and UpdateAddresses() call from the
|
||||||
// policy. Other methods of the balancer.ClientConn interface are not overridden
|
// child policy to add security configuration required by xDS credentials.
|
||||||
// and hence get the original implementation.
|
//
|
||||||
|
// Other methods of the balancer.ClientConn interface are not overridden and
|
||||||
|
// hence get the original implementation.
|
||||||
type ccWrapper struct {
|
type ccWrapper struct {
|
||||||
balancer.ClientConn
|
balancer.ClientConn
|
||||||
|
|
||||||
|
|
@ -518,3 +520,11 @@ func (ccw *ccWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubC
|
||||||
}
|
}
|
||||||
return ccw.ClientConn.NewSubConn(newAddrs, opts)
|
return ccw.ClientConn.NewSubConn(newAddrs, opts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (ccw *ccWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
||||||
|
newAddrs := make([]resolver.Address, len(addrs))
|
||||||
|
for i, addr := range addrs {
|
||||||
|
newAddrs[i] = xdsinternal.SetHandshakeInfo(addr, ccw.xdsHI)
|
||||||
|
}
|
||||||
|
ccw.ClientConn.UpdateAddresses(sc, newAddrs)
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -171,34 +171,35 @@ func setupWithXDSCreds(t *testing.T) (*fakeclient.Client, *cdsBalancer, *testEDS
|
||||||
// makeNewSubConn invokes the NewSubConn() call on the balancer.ClientConn
|
// makeNewSubConn invokes the NewSubConn() call on the balancer.ClientConn
|
||||||
// passed to the EDS balancer, and verifies that the CDS balancer forwards the
|
// passed to the EDS balancer, and verifies that the CDS balancer forwards the
|
||||||
// call appropriately to its parent balancer.ClientConn with or without
|
// call appropriately to its parent balancer.ClientConn with or without
|
||||||
// attributes bases on the value of wantAttributes.
|
// attributes bases on the value of wantFallback.
|
||||||
func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *xdstestutils.TestClientConn, wantFallback bool) error {
|
func makeNewSubConn(ctx context.Context, edsCC balancer.ClientConn, parentCC *xdstestutils.TestClientConn, wantFallback bool) (balancer.SubConn, error) {
|
||||||
dummyAddr := "foo-address"
|
dummyAddr := "foo-address"
|
||||||
addrs := []resolver.Address{{Addr: dummyAddr}}
|
addrs := []resolver.Address{{Addr: dummyAddr}}
|
||||||
if _, err := edsCC.NewSubConn(addrs, balancer.NewSubConnOptions{}); err != nil {
|
sc, err := edsCC.NewSubConn(addrs, balancer.NewSubConnOptions{})
|
||||||
return fmt.Errorf("NewSubConn(%+v) on parent ClientConn failed: %v", addrs, err)
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("NewSubConn(%+v) on parent ClientConn failed: %v", addrs, err)
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return errors.New("timeout when waiting for new SubConn")
|
return nil, errors.New("timeout when waiting for new SubConn")
|
||||||
case gotAddrs := <-parentCC.NewSubConnAddrsCh:
|
case gotAddrs := <-parentCC.NewSubConnAddrsCh:
|
||||||
if len(gotAddrs) != 1 {
|
if len(gotAddrs) != 1 {
|
||||||
return fmt.Errorf("NewSubConn expected 1 address, got %d", len(gotAddrs))
|
return nil, fmt.Errorf("NewSubConn expected 1 address, got %d", len(gotAddrs))
|
||||||
}
|
}
|
||||||
if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want {
|
if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want {
|
||||||
return fmt.Errorf("resolver.Address passed to parent ClientConn has address %q, want %q", got, want)
|
return nil, fmt.Errorf("resolver.Address passed to parent ClientConn has address %q, want %q", got, want)
|
||||||
}
|
}
|
||||||
getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdsinternal.HandshakeInfo)
|
getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdsinternal.HandshakeInfo)
|
||||||
hi := getHI(gotAddrs[0].Attributes)
|
hi := getHI(gotAddrs[0].Attributes)
|
||||||
if hi == nil {
|
if hi == nil {
|
||||||
return errors.New("resolver.Address passed to parent ClientConn doesn't contain attributes")
|
return nil, errors.New("resolver.Address passed to parent ClientConn doesn't contain attributes")
|
||||||
}
|
}
|
||||||
if gotFallback := hi.UseFallbackCreds(); gotFallback != wantFallback {
|
if gotFallback := hi.UseFallbackCreds(); gotFallback != wantFallback {
|
||||||
return fmt.Errorf("resolver.Address HandshakeInfo uses fallback creds? %v, want %v", gotFallback, wantFallback)
|
return nil, fmt.Errorf("resolver.Address HandshakeInfo uses fallback creds? %v, want %v", gotFallback, wantFallback)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return nil
|
return sc, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// TestSecurityConfigWithoutXDSCreds tests the case where xdsCredentials are not
|
// TestSecurityConfigWithoutXDSCreds tests the case where xdsCredentials are not
|
||||||
|
|
@ -243,7 +244,7 @@ func (s) TestSecurityConfigWithoutXDSCreds(t *testing.T) {
|
||||||
// Make a NewSubConn and verify that the HandshakeInfo does not contain any
|
// Make a NewSubConn and verify that the HandshakeInfo does not contain any
|
||||||
// certificate providers, forcing the credentials implementation to use
|
// certificate providers, forcing the credentials implementation to use
|
||||||
// fallback creds.
|
// fallback creds.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -299,7 +300,7 @@ func (s) TestNoSecurityConfigWithXDSCreds(t *testing.T) {
|
||||||
// Make a NewSubConn and verify that the HandshakeInfo does not contain any
|
// Make a NewSubConn and verify that the HandshakeInfo does not contain any
|
||||||
// certificate providers, forcing the credentials implementation to use
|
// certificate providers, forcing the credentials implementation to use
|
||||||
// fallback creds.
|
// fallback creds.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -451,7 +452,7 @@ func (s) TestSecurityConfigUpdate_BadToGood(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that attributes are added.
|
// Make a NewSubConn and verify that attributes are added.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -487,9 +488,31 @@ func (s) TestGoodSecurityConfig(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that attributes are added.
|
// Make a NewSubConn and verify that attributes are added.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
sc, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false)
|
||||||
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Invoke UpdateAddresses and verify that attributes are added.
|
||||||
|
dummyAddr := "bar-address"
|
||||||
|
addrs := []resolver.Address{{Addr: dummyAddr}}
|
||||||
|
edsB.parentCC.UpdateAddresses(sc, addrs)
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
t.Fatal("timeout when waiting for addresses to be updated on the subConn")
|
||||||
|
case gotAddrs := <-tcc.UpdateAddressesAddrsCh:
|
||||||
|
if len(gotAddrs) != 1 {
|
||||||
|
t.Fatalf("UpdateAddresses expected 1 address, got %d", len(gotAddrs))
|
||||||
|
}
|
||||||
|
if got, want := gotAddrs[0].Addr, addrs[0].Addr; got != want {
|
||||||
|
t.Fatalf("resolver.Address passed to parent ClientConn through UpdateAddresses() has address %q, want %q", got, want)
|
||||||
|
}
|
||||||
|
getHI := internal.GetXDSHandshakeInfoForTesting.(func(attr *attributes.Attributes) *xdsinternal.HandshakeInfo)
|
||||||
|
hi := getHI(gotAddrs[0].Attributes)
|
||||||
|
if hi == nil {
|
||||||
|
t.Fatal("resolver.Address passed to parent ClientConn through UpdateAddresses() doesn't contain attributes")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
|
func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
|
||||||
|
|
@ -518,7 +541,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that attributes are added.
|
// Make a NewSubConn and verify that attributes are added.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -532,7 +555,7 @@ func (s) TestSecurityConfigUpdate_GoodToFallback(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that fallback creds are used.
|
// Make a NewSubConn and verify that fallback creds are used.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, true); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -568,7 +591,7 @@ func (s) TestSecurityConfigUpdate_GoodToBad(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that attributes are added.
|
// Make a NewSubConn and verify that attributes are added.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -650,7 +673,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that attributes are added.
|
// Make a NewSubConn and verify that attributes are added.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -671,7 +694,7 @@ func (s) TestSecurityConfigUpdate_GoodToGood(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make a NewSubConn and verify that attributes are added.
|
// Make a NewSubConn and verify that attributes are added.
|
||||||
if err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
if _, err := makeNewSubConn(ctx, edsB.parentCC, tcc, false); err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -71,9 +71,10 @@ func (tsc *TestSubConn) String() string {
|
||||||
type TestClientConn struct {
|
type TestClientConn struct {
|
||||||
logger testingLogger
|
logger testingLogger
|
||||||
|
|
||||||
NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
|
NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
|
||||||
NewSubConnCh chan balancer.SubConn // the last 10 subconn created.
|
NewSubConnCh chan balancer.SubConn // the last 10 subconn created.
|
||||||
RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed.
|
RemoveSubConnCh chan balancer.SubConn // the last 10 subconn removed.
|
||||||
|
UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses().
|
||||||
|
|
||||||
NewPickerCh chan balancer.Picker // the last picker updated.
|
NewPickerCh chan balancer.Picker // the last picker updated.
|
||||||
NewStateCh chan connectivity.State // the last state.
|
NewStateCh chan connectivity.State // the last state.
|
||||||
|
|
@ -86,9 +87,10 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
|
||||||
return &TestClientConn{
|
return &TestClientConn{
|
||||||
logger: t,
|
logger: t,
|
||||||
|
|
||||||
NewSubConnAddrsCh: make(chan []resolver.Address, 10),
|
NewSubConnAddrsCh: make(chan []resolver.Address, 10),
|
||||||
NewSubConnCh: make(chan balancer.SubConn, 10),
|
NewSubConnCh: make(chan balancer.SubConn, 10),
|
||||||
RemoveSubConnCh: make(chan balancer.SubConn, 10),
|
RemoveSubConnCh: make(chan balancer.SubConn, 10),
|
||||||
|
UpdateAddressesAddrsCh: make(chan []resolver.Address, 1),
|
||||||
|
|
||||||
NewPickerCh: make(chan balancer.Picker, 1),
|
NewPickerCh: make(chan balancer.Picker, 1),
|
||||||
NewStateCh: make(chan connectivity.State, 1),
|
NewStateCh: make(chan connectivity.State, 1),
|
||||||
|
|
@ -124,7 +126,13 @@ func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// UpdateAddresses updates the addresses on the SubConn.
|
// UpdateAddresses updates the addresses on the SubConn.
|
||||||
func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {}
|
func (tcc *TestClientConn) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
|
||||||
|
tcc.logger.Logf("testClientConn: UpdateAddresses(%v, %+v)", sc, addrs)
|
||||||
|
select {
|
||||||
|
case tcc.UpdateAddressesAddrsCh <- addrs:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// UpdateState updates connectivity state and picker.
|
// UpdateState updates connectivity state and picker.
|
||||||
func (tcc *TestClientConn) UpdateState(bs balancer.State) {
|
func (tcc *TestClientConn) UpdateState(bs balancer.State) {
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue