grpc: Add endpoints in resolverWrapper.NewAddresses (#8149)

This commit is contained in:
Arjan Singh Bal 2025-03-06 22:55:15 +05:30 committed by GitHub
parent f49c747db7
commit 5199327135
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 69 additions and 7 deletions

View File

@ -144,6 +144,58 @@ func (s) TestResolverAddressesToEndpoints(t *testing.T) {
}
}
// Test ensures one Endpoint is created for each entry in
// resolver.State.Addresses automatically. The test calls the deprecated
// NewAddresses API to send a list of addresses to the channel.
func (s) TestResolverAddressesToEndpointsUsingNewAddresses(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
const scheme = "testresolveraddressestoendpoints"
r := manual.NewBuilderWithScheme(scheme)
stateCh := make(chan balancer.ClientConnState, 1)
bf := stub.BalancerFuncs{
UpdateClientConnState: func(_ *stub.BalancerData, ccs balancer.ClientConnState) error {
stateCh <- ccs
return nil
},
}
balancerName := "stub-balancer-" + scheme
stub.Register(balancerName, bf)
a1 := attributes.New("x", "y")
a2 := attributes.New("a", "b")
addrs := []resolver.Address{
{Addr: "addr1", BalancerAttributes: a1},
{Addr: "addr2", BalancerAttributes: a2},
}
cc, err := NewClient(r.Scheme()+":///",
WithTransportCredentials(insecure.NewCredentials()),
WithResolvers(r),
WithDefaultServiceConfig(fmt.Sprintf(`{"loadBalancingConfig": [{"%s":{}}]}`, balancerName)))
if err != nil {
t.Fatalf("grpc.NewClient() failed: %v", err)
}
cc.Connect()
defer cc.Close()
r.CC.NewAddress(addrs)
select {
case got := <-stateCh:
want := []resolver.Endpoint{
{Addresses: []resolver.Address{{Addr: "addr1"}}, Attributes: a1},
{Addresses: []resolver.Address{{Addr: "addr2"}}, Attributes: a2},
}
if diff := cmp.Diff(got.ResolverState.Endpoints, want); diff != "" {
t.Errorf("Did not receive expected endpoints. Diff (-got +want):\n%v", diff)
}
case <-ctx.Done():
t.Fatalf("timed out waiting for endpoints")
}
}
// Test ensures that there is no panic if the attributes within
// resolver.State.Addresses contains a typed-nil value.
func (s) TestResolverAddressesWithTypedNilAttribute(t *testing.T) {

View File

@ -134,12 +134,7 @@ func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
return nil
}
if s.Endpoints == nil {
s.Endpoints = make([]resolver.Endpoint, 0, len(s.Addresses))
for _, a := range s.Addresses {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
s.Endpoints = append(s.Endpoints, ep)
}
s.Endpoints = addressesToEndpoints(s.Addresses)
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
@ -172,7 +167,11 @@ func (ccr *ccResolverWrapper) NewAddress(addrs []resolver.Address) {
ccr.cc.mu.Unlock()
return
}
s := resolver.State{Addresses: addrs, ServiceConfig: ccr.curState.ServiceConfig}
s := resolver.State{
Addresses: addrs,
ServiceConfig: ccr.curState.ServiceConfig,
Endpoints: addressesToEndpoints(addrs),
}
ccr.addChannelzTraceEvent(s)
ccr.curState = s
ccr.mu.Unlock()
@ -210,3 +209,13 @@ func (ccr *ccResolverWrapper) addChannelzTraceEvent(s resolver.State) {
}
channelz.Infof(logger, ccr.cc.channelz, "Resolver state updated: %s (%v)", pretty.ToJSON(s), strings.Join(updates, "; "))
}
func addressesToEndpoints(addrs []resolver.Address) []resolver.Endpoint {
endpoints := make([]resolver.Endpoint, 0, len(addrs))
for _, a := range addrs {
ep := resolver.Endpoint{Addresses: []resolver.Address{a}, Attributes: a.BalancerAttributes}
ep.Addresses[0].BalancerAttributes = nil
endpoints = append(endpoints, ep)
}
return endpoints
}

View File

@ -174,6 +174,7 @@ CredsBundle is deprecated:
GetMetadata is deprecated:
internal.Logger is deprecated:
Metadata is deprecated: use Attributes instead.
NewAddress is deprecated:
NewSubConn is deprecated:
OverrideServerName is deprecated:
RemoveSubConn is deprecated: