Revert "dns: stop polling for updates; use UpdateState API" (#3213)

This reverts commit e5e980f276.
This commit is contained in:
Menghan Li 2019-11-26 16:07:43 -08:00 committed by GitHub
parent 157dc9f3e4
commit 9dc72d1df0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 259 additions and 155 deletions

View File

@ -32,10 +32,11 @@ import (
"sync"
"time"
"google.golang.org/grpc/backoff"
"google.golang.org/grpc/grpclog"
internalbackoff "google.golang.org/grpc/internal/backoff"
"google.golang.org/grpc/internal/grpcrand"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
// EnableSRVLookups controls whether the DNS resolver attempts to fetch gRPCLB
@ -48,6 +49,7 @@ func init() {
const (
defaultPort = "443"
defaultFreq = time.Minute * 30
defaultDNSSvrPort = "53"
golang = "GO"
// txtPrefix is the prefix string to be prepended to the host name for txt record lookup.
@ -97,10 +99,13 @@ var customAuthorityResolver = func(authority string) (netResolver, error) {
// NewBuilder creates a dnsBuilder which is used to factory DNS resolvers.
func NewBuilder() resolver.Builder {
return &dnsBuilder{}
return &dnsBuilder{minFreq: defaultFreq}
}
type dnsBuilder struct{}
type dnsBuilder struct {
// minimum frequency of polling the DNS server.
minFreq time.Duration
}
// Build creates and starts a DNS resolver that watches the name resolution of the target.
func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
@ -110,20 +115,33 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
}
// IP address.
if ipAddr, ok := formatIP(host); ok {
addr := []resolver.Address{{Addr: ipAddr + ":" + port}}
cc.UpdateState(resolver.State{Addresses: addr})
return deadResolver{}, nil
if net.ParseIP(host) != nil {
host, _ = formatIP(host)
addr := []resolver.Address{{Addr: host + ":" + port}}
i := &ipResolver{
cc: cc,
ip: addr,
rn: make(chan struct{}, 1),
q: make(chan struct{}),
}
cc.NewAddress(addr)
go i.watcher()
return i, nil
}
// DNS address (non-IP).
ctx, cancel := context.WithCancel(context.Background())
bc := backoff.DefaultConfig
bc.MaxDelay = b.minFreq
d := &dnsResolver{
freq: b.minFreq,
backoff: internalbackoff.Exponential{Config: bc},
host: host,
port: port,
ctx: ctx,
cancel: cancel,
cc: cc,
t: time.NewTimer(0),
rn: make(chan struct{}, 1),
disableServiceConfig: opts.DisableServiceConfig,
}
@ -139,7 +157,6 @@ func (b *dnsBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts
d.wg.Add(1)
go d.watcher()
d.ResolveNow(resolver.ResolveNowOptions{})
return d, nil
}
@ -154,23 +171,53 @@ type netResolver interface {
LookupTXT(ctx context.Context, name string) (txts []string, err error)
}
// deadResolver is a resolver that does nothing.
type deadResolver struct{}
// ipResolver watches for the name resolution update for an IP address.
type ipResolver struct {
cc resolver.ClientConn
ip []resolver.Address
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
q chan struct{}
}
func (deadResolver) ResolveNow(resolver.ResolveNowOptions) {}
// ResolveNow resend the address it stores, no resolution is needed.
func (i *ipResolver) ResolveNow(opt resolver.ResolveNowOptions) {
select {
case i.rn <- struct{}{}:
default:
}
}
func (deadResolver) Close() {}
// Close closes the ipResolver.
func (i *ipResolver) Close() {
close(i.q)
}
func (i *ipResolver) watcher() {
for {
select {
case <-i.rn:
i.cc.NewAddress(i.ip)
case <-i.q:
return
}
}
}
// dnsResolver watches for the name resolution update for a non-IP target.
type dnsResolver struct {
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
freq time.Duration
backoff internalbackoff.Exponential
retryCount int
host string
port string
resolver netResolver
ctx context.Context
cancel context.CancelFunc
cc resolver.ClientConn
// rn channel is used by ResolveNow() to force an immediate resolution of the target.
rn chan struct{}
t *time.Timer
// wg is used to enforce Close() to return after the watcher() goroutine has finished.
// Otherwise, data race will be possible. [Race Example] in dns_resolver_test we
// replace the real lookup functions with mocked ones to facilitate testing.
@ -182,7 +229,7 @@ type dnsResolver struct {
}
// ResolveNow invoke an immediate resolution of the target that this dnsResolver watches.
func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
func (d *dnsResolver) ResolveNow(opt resolver.ResolveNowOptions) {
select {
case d.rn <- struct{}{}:
default:
@ -193,6 +240,7 @@ func (d *dnsResolver) ResolveNow(resolver.ResolveNowOptions) {
func (d *dnsResolver) Close() {
d.cancel()
d.wg.Wait()
d.t.Stop()
}
func (d *dnsResolver) watcher() {
@ -201,11 +249,29 @@ func (d *dnsResolver) watcher() {
select {
case <-d.ctx.Done():
return
case <-d.t.C:
case <-d.rn:
if !d.t.Stop() {
// Before resetting a timer, it should be stopped to prevent racing with
// reads on it's channel.
<-d.t.C
}
}
state := d.lookup()
d.cc.UpdateState(*state)
result, sc := d.lookup()
// Next lookup should happen within an interval defined by d.freq. It may be
// more often due to exponential retry on empty address list.
if len(result) == 0 {
d.retryCount++
d.t.Reset(d.backoff.Backoff(d.retryCount))
} else {
d.retryCount = 0
d.t.Reset(d.freq)
}
if sc != "" { // We get empty string when disabled or the TXT lookup failed.
d.cc.NewServiceConfig(sc)
}
d.cc.NewAddress(result)
// Sleep to prevent excessive re-resolutions. Incoming resolution requests
// will be queued in d.rn.
@ -248,12 +314,11 @@ func (d *dnsResolver) lookupSRV() []resolver.Address {
return newAddrs
}
func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
func (d *dnsResolver) lookupTXT() string {
ss, err := d.resolver.LookupTXT(d.ctx, txtPrefix+d.host)
if err != nil {
err = fmt.Errorf("error from DNS TXT record lookup: %v", err)
grpclog.Infoln("grpc:", err)
return &serviceconfig.ParseResult{Err: err}
grpclog.Infof("grpc: failed dns TXT record lookup due to %v.\n", err)
return ""
}
var res string
for _, s := range ss {
@ -262,12 +327,10 @@ func (d *dnsResolver) lookupTXT() *serviceconfig.ParseResult {
// TXT record must have "grpc_config=" attribute in order to be used as service config.
if !strings.HasPrefix(res, txtAttribute) {
grpclog.Warningf("grpc: DNS TXT record %v missing %v attribute", res, txtAttribute)
// This is not an error; it is the equivalent of not having a service config.
return nil
grpclog.Warningf("grpc: TXT record %v missing %v attribute", res, txtAttribute)
return ""
}
sc := canaryingSC(strings.TrimPrefix(res, txtAttribute))
return d.cc.ParseServiceConfig(sc)
return strings.TrimPrefix(res, txtAttribute)
}
func (d *dnsResolver) lookupHost() []resolver.Address {
@ -289,15 +352,15 @@ func (d *dnsResolver) lookupHost() []resolver.Address {
return newAddrs
}
func (d *dnsResolver) lookup() *resolver.State {
srv := d.lookupSRV()
state := &resolver.State{
Addresses: append(d.lookupHost(), srv...),
func (d *dnsResolver) lookup() ([]resolver.Address, string) {
newAddrs := d.lookupSRV()
// Support fallback to non-balancer address.
newAddrs = append(newAddrs, d.lookupHost()...)
if d.disableServiceConfig {
return newAddrs, ""
}
if !d.disableServiceConfig {
state.ServiceConfig = d.lookupTXT()
}
return state
sc := d.lookupTXT()
return newAddrs, canaryingSC(sc)
}
// formatIP returns ok = false if addr is not a valid textual representation of an IP address.

View File

@ -35,11 +35,14 @@ import (
)
func TestMain(m *testing.M) {
// Set a non-zero duration only for tests which are actually testing that
// feature.
replaceDNSResRate(time.Duration(0)) // No nead to clean up since we os.Exit
replaceNetFunc(nil) // No nead to clean up since we os.Exit
// Set a valid duration for the re-resolution rate only for tests which are
// actually testing that feature.
dc := replaceDNSResRate(time.Duration(0))
defer dc()
cleanup := replaceNetFunc(nil)
code := m.Run()
cleanup()
os.Exit(code)
}
@ -48,43 +51,47 @@ const (
)
type testClientConn struct {
resolver.ClientConn // For unimplemented functions
target string
m1 sync.Mutex
state resolver.State
updateStateCalls int
target string
m1 sync.Mutex
addrs []resolver.Address
a int // how many times NewAddress() has been called
m2 sync.Mutex
sc string
s int
}
func (t *testClientConn) UpdateState(s resolver.State) {
panic("unused")
}
func (t *testClientConn) NewAddress(addresses []resolver.Address) {
t.m1.Lock()
defer t.m1.Unlock()
t.state = s
t.updateStateCalls++
t.addrs = addresses
t.a++
}
func (t *testClientConn) getState() (resolver.State, int) {
func (t *testClientConn) getAddress() ([]resolver.Address, int) {
t.m1.Lock()
defer t.m1.Unlock()
return t.state, t.updateStateCalls
return t.addrs, t.a
}
func scFromState(s resolver.State) string {
if s.ServiceConfig != nil {
if s.ServiceConfig.Err != nil {
return ""
}
return s.ServiceConfig.Config.(unparsedServiceConfig).config
}
return ""
func (t *testClientConn) NewServiceConfig(serviceConfig string) {
t.m2.Lock()
defer t.m2.Unlock()
t.sc = serviceConfig
t.s++
}
type unparsedServiceConfig struct {
serviceconfig.Config
config string
func (t *testClientConn) getSc() (string, int) {
t.m2.Lock()
defer t.m2.Unlock()
return t.sc, t.s
}
func (t *testClientConn) ParseServiceConfig(s string) *serviceconfig.ParseResult {
return &serviceconfig.ParseResult{Config: unparsedServiceConfig{config: s}}
func (t *testClientConn) ParseServiceConfig(string) *serviceconfig.ParseResult {
panic("not implemented")
}
func (t *testClientConn) ReportError(error) {
@ -691,23 +698,33 @@ func testDNSResolver(t *testing.T) {
if err != nil {
t.Fatalf("%v\n", err)
}
var state resolver.State
var addrs []resolver.Address
var cnt int
for i := 0; i < 2000; i++ {
state, cnt = cc.getState()
for {
addrs, cnt = cc.getAddress()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if cnt == 0 {
t.Fatalf("UpdateState not called after 2s; aborting")
var sc string
if a.scWant != "" {
for {
sc, cnt = cc.getSc()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
} else {
// A new service config should never be produced; call getSc once
// just in case.
sc, _ = cc.getSc()
}
if !reflect.DeepEqual(a.addrWant, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant)
if !reflect.DeepEqual(a.addrWant, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrWant)
}
sc := scFromState(state)
if a.scWant != sc {
if !reflect.DeepEqual(a.scWant, sc) {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
}
r.Close()
@ -737,7 +754,7 @@ func testDNSResolverWithSRV(t *testing.T) {
},
{
"srv.ipv4.single.fake",
[]resolver.Address{{Addr: "2.4.6.8" + colonDefaultPort}, {Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.single.fake"}},
[]resolver.Address{{Addr: "1.2.3.4:1234", Type: resolver.GRPCLB, ServerName: "ipv4.single.fake"}, {Addr: "2.4.6.8" + colonDefaultPort}},
generateSC("srv.ipv4.single.fake"),
},
{
@ -772,26 +789,36 @@ func testDNSResolverWithSRV(t *testing.T) {
if err != nil {
t.Fatalf("%v\n", err)
}
defer r.Close()
var state resolver.State
var addrs []resolver.Address
var cnt int
for i := 0; i < 2000; i++ {
state, cnt = cc.getState()
for {
addrs, cnt = cc.getAddress()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if cnt == 0 {
t.Fatalf("UpdateState not called after 2s; aborting")
var sc string
if a.scWant != "" {
for {
sc, cnt = cc.getSc()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
} else {
// A new service config should never be produced; call getSc once
// just in case.
sc, _ = cc.getSc()
}
if !reflect.DeepEqual(a.addrWant, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant)
if !reflect.DeepEqual(a.addrWant, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrWant)
}
sc := scFromState(state)
if a.scWant != sc {
if !reflect.DeepEqual(a.scWant, sc) {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
}
r.Close()
}
}
@ -840,47 +867,55 @@ func testDNSResolveNow(t *testing.T) {
if err != nil {
t.Fatalf("%v\n", err)
}
defer r.Close()
var state resolver.State
var addrs []resolver.Address
var cnt int
for i := 0; i < 2000; i++ {
state, cnt = cc.getState()
for {
addrs, cnt = cc.getAddress()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if cnt == 0 {
t.Fatalf("UpdateState not called after 2s; aborting. state=%v", state)
}
if !reflect.DeepEqual(a.addrWant, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrWant)
}
sc := scFromState(state)
if a.scWant != sc {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
}
revertTbl := mutateTbl(a.target)
r.ResolveNow(resolver.ResolveNowOptions{})
for i := 0; i < 2000; i++ {
state, cnt = cc.getState()
if cnt == 2 {
var sc string
for {
sc, cnt = cc.getSc()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if cnt != 2 {
t.Fatalf("UpdateState not called after 2s; aborting. state=%v", state)
if !reflect.DeepEqual(a.addrWant, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrWant)
}
sc = scFromState(state)
if !reflect.DeepEqual(a.addrNext, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, state.Addresses, a.addrNext)
if !reflect.DeepEqual(a.scWant, sc) {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
}
if a.scNext != sc {
revertTbl := mutateTbl(a.target)
r.ResolveNow(resolver.ResolveNowOptions{})
for i := 0; i < 1000; i++ {
addrs, cnt = cc.getAddress()
// Break if the address list changes or enough redundant updates happen.
if !reflect.DeepEqual(addrs, a.addrWant) || cnt > 10 {
break
}
time.Sleep(time.Millisecond)
}
for i := 0; i < 1000; i++ {
sc, cnt = cc.getSc()
// Break if the service config changes or enough redundant updates happen.
if !reflect.DeepEqual(sc, a.scWant) || cnt > 10 {
break
}
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(a.addrNext, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", a.target, addrs, a.addrNext)
}
if !reflect.DeepEqual(a.scNext, sc) {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scNext)
}
revertTbl()
r.Close()
}
}
@ -911,26 +946,29 @@ func testIPResolver(t *testing.T) {
if err != nil {
t.Fatalf("%v\n", err)
}
var state resolver.State
var addrs []resolver.Address
var cnt int
for {
state, cnt = cc.getState()
addrs, cnt = cc.getAddress()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(v.want, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, state.Addresses, v.want)
if !reflect.DeepEqual(v.want, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, addrs, v.want)
}
r.ResolveNow(resolver.ResolveNowOptions{})
for i := 0; i < 50; i++ {
state, cnt = cc.getState()
if cnt > 1 {
t.Fatalf("Unexpected second call by resolver to UpdateState. state: %v", state)
for {
addrs, cnt = cc.getAddress()
if cnt == 2 {
break
}
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(v.want, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", v.target, addrs, v.want)
}
r.Close()
}
}
@ -968,7 +1006,7 @@ func TestResolveFunc(t *testing.T) {
r.Close()
}
if !reflect.DeepEqual(err, v.want) {
t.Errorf("Build(%q, cc, _) = %v, want %v", v.addr, err, v.want)
t.Errorf("Build(%q, cc, resolver.BuildOptions{}) = %v, want %v", v.addr, err, v.want)
}
}
}
@ -999,23 +1037,26 @@ func TestDisableServiceConfig(t *testing.T) {
if err != nil {
t.Fatalf("%v\n", err)
}
defer r.Close()
var cnt int
var state resolver.State
for i := 0; i < 2000; i++ {
state, cnt = cc.getState()
var sc string
// First wait for addresses. We know service configs are reported
// first, so once addresses have been reported, we can then check to
// see whether any configs have been reported..
for i := 0; i < 1000; i++ {
_, cnt = cc.getAddress()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if cnt == 0 {
t.Fatalf("UpdateState not called after 2s; aborting")
sc, cnt = cc.getSc()
if a.disableServiceConfig && cnt > 0 {
t.Errorf("Resolver reported a service config even though lookups are disabled: sc=%v, cnt=%v", sc, cnt)
}
sc := scFromState(state)
if a.scWant != sc {
if !reflect.DeepEqual(a.scWant, sc) {
t.Errorf("Resolved service config of target: %q = %+v, want %+v\n", a.target, sc, a.scWant)
}
r.Close()
}
}
@ -1027,49 +1068,49 @@ func TestDNSResolverRetry(t *testing.T) {
if err != nil {
t.Fatalf("%v\n", err)
}
defer r.Close()
var state resolver.State
for i := 0; i < 2000; i++ {
state, _ = cc.getState()
if len(state.Addresses) == 1 {
var addrs []resolver.Address
for {
addrs, _ = cc.getAddress()
if len(addrs) == 1 {
break
}
time.Sleep(time.Millisecond)
}
if len(state.Addresses) != 1 {
t.Fatalf("UpdateState not called with 1 address after 2s; aborting. state=%v", state)
}
want := []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}}
if !reflect.DeepEqual(want, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want)
if !reflect.DeepEqual(want, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, addrs, want)
}
// mutate the host lookup table so the target has 0 address returned.
revertTbl := mutateTbl(target)
// trigger a resolve that will get empty address list
r.ResolveNow(resolver.ResolveNowOptions{})
for i := 0; i < 2000; i++ {
state, _ = cc.getState()
if len(state.Addresses) == 0 {
for {
addrs, _ = cc.getAddress()
if len(addrs) == 0 {
break
}
time.Sleep(time.Millisecond)
}
if len(state.Addresses) != 0 {
t.Fatalf("UpdateState not called with 0 address after 2s; aborting. state=%v", state)
}
revertTbl()
// wait for the retry to happen in two seconds.
r.ResolveNow(resolver.ResolveNowOptions{})
for i := 0; i < 2000; i++ {
state, _ = cc.getState()
if len(state.Addresses) == 1 {
break
timer := time.NewTimer(2 * time.Second)
loop:
for {
select {
case <-timer.C:
break loop
default:
addrs, _ = cc.getAddress()
if len(addrs) != 0 {
break loop
}
time.Sleep(time.Millisecond)
}
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(want, state.Addresses) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, want)
if !reflect.DeepEqual(want, addrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, addrs, want)
}
r.Close()
}
func TestCustomAuthority(t *testing.T) {
@ -1256,16 +1297,16 @@ func TestRateLimitedResolve(t *testing.T) {
}
wantAddrs := []resolver.Address{{Addr: "1.2.3.4" + colonDefaultPort}, {Addr: "5.6.7.8" + colonDefaultPort}}
var state resolver.State
var gotAddrs []resolver.Address
for {
var cnt int
state, cnt = cc.getState()
gotAddrs, cnt = cc.getAddress()
if cnt > 0 {
break
}
time.Sleep(time.Millisecond)
}
if !reflect.DeepEqual(state.Addresses, wantAddrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, state.Addresses, wantAddrs)
if !reflect.DeepEqual(gotAddrs, wantAddrs) {
t.Errorf("Resolved addresses of target: %q = %+v, want %+v\n", target, gotAddrs, wantAddrs)
}
}