components-contrib/nameresolution/consul/watcher.go

365 lines
9.8 KiB
Go

package consul
import (
"context"
"errors"
"strings"
"time"
backoff "github.com/cenkalti/backoff/v4"
consul "github.com/hashicorp/consul/api"
)
const (
// initial back interval.
initialBackOffInternal = 5 * time.Second
// maximum back off time, this is to prevent exponential runaway.
maxBackOffInternal = 180 * time.Second
)
// A watchPlan contains all the state tracked in the loop
// that keeps the consul service registry cache fresh
type watchPlan struct {
expired bool
lastParamVal blockingParamVal
lastResult map[serviceIdentifier]bool
options *consul.QueryOptions
healthServiceQueryFilter string
failing bool
backOff *backoff.ExponentialBackOff
}
type blockingParamVal interface {
equal(other blockingParamVal) bool
next(previous blockingParamVal) blockingParamVal
}
type waitIndexVal uint64
// Equal implements BlockingParamVal.
func (idx waitIndexVal) equal(other blockingParamVal) bool {
if otherIdx, ok := other.(waitIndexVal); ok {
return idx == otherIdx
}
return false
}
// Next implements BlockingParamVal.
func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal {
if previous == nil {
return idx
}
prevIdx, ok := previous.(waitIndexVal)
if ok && prevIdx == idx {
// this value is the same as the previous index, reset
return waitIndexVal(0)
}
return idx
}
type serviceIdentifier struct {
serviceName string
serviceID string
node string
}
func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool {
healthByService := make(map[serviceIdentifier]bool)
for _, check := range checks {
// generate unique identifier for service
id := serviceIdentifier{
serviceID: check.ServiceID,
serviceName: check.ServiceName,
node: check.Node,
}
// if the service is not in the map - add and init to healthy
if state, ok := healthByService[id]; !ok {
healthByService[id] = true
} else if !state {
// service exists and is already unhealthy - skip
continue
}
// if the check is not healthy then set service to unhealthy
if check.Status != consul.HealthPassing {
healthByService[id] = false
}
}
return healthByService
}
func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map[string]struct{} {
changedServices := make(map[string]struct{}) // service name set
// foreach new result
for newKey, newValue := range newResult {
// if the service exists in the old result and has the same value - skip
if oldValue, ok := p.lastResult[newKey]; ok && newValue == oldValue {
continue
}
// service is new or changed - add to set
changedServices[newKey.serviceName] = struct{}{}
}
// foreach old result
for oldKey := range p.lastResult {
// if the service does not exist in the new result - add to set
if _, ok := newResult[oldKey]; !ok {
changedServices[oldKey.serviceName] = struct{}{}
}
}
return changedServices
}
func getServiceNameFilter(services []string) string {
nameFilters := make([]string, len(services))
for i, v := range services {
nameFilters[i] = `ServiceName=="` + v + `"`
}
return strings.Join(nameFilters, " or ")
}
func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) (blockingParamVal, consul.HealthChecks, error) {
p.options = p.options.WithContext(ctx)
if p.lastParamVal != nil {
p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal))
}
// build service name filter for all keys
p.options.Filter = getServiceNameFilter(services)
// request health checks for target services using blocking query
checks, meta, err := r.client.Health().State(consul.HealthAny, p.options)
if err != nil {
// if it failed during long poll try again with no wait
if p.options.WaitIndex != uint64(0) {
p.options.WaitIndex = 0
checks, meta, err = r.client.Health().State(consul.HealthAny, p.options)
}
if err != nil {
// if the context was canceled
if errors.Is(err, context.Canceled) {
return nil, nil, err
}
// if it failed with no wait and plan is not expired
if p.options.WaitIndex == uint64(0) && !p.expired {
p.lastResult = nil
p.expired = true
r.registry.expireAll()
}
return nil, nil, err
}
}
p.expired = false
return waitIndexVal(meta.LastIndex), checks, err
}
// runWatchPlan executes the following steps:
// - requests health check changes for the target keys from the consul agent using http long polling
// - compares the results to the previous
// - if there is a change for a given serviceName/appId it invokes the health/service api to get a list of healthy targets
// - signals completion of the watch plan
func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan struct{}) {
defer func() {
// signal completion of the watch plan to unblock the watch plan loop
watchPlanComplete <- struct{}{}
}()
// invoke blocking call
blockParam, result, err := r.watch(ctx, p, services)
// if the ctx was canceled then do nothing
if errors.Is(err, context.Canceled) {
return
}
// handle an error in the watch function
if err != nil {
// reset the query index so the next attempt does not
p.lastParamVal = waitIndexVal(0)
// perform an exponential backoff
if !p.failing {
p.failing = true
p.backOff.Reset()
}
retry := p.backOff.NextBackOff()
// pause watcher routine until ctx is canceled or retry timer finishes
r.logger.Errorf("consul service-watcher error: %v, retry in %s", err, retry.Round(time.Second))
sleepTimer := time.NewTimer(retry)
select {
case <-ctx.Done():
sleepTimer.Stop()
r.logger.Debug("consul service-watcher retry throttling canceled")
case <-sleepTimer.C:
}
return
} else {
// reset the plan failure flag
p.failing = false
}
// if the result index is unchanged do nothing
if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) {
return
} else {
// update the plan index
oldParamVal := p.lastParamVal
p.lastParamVal = blockParam.next(oldParamVal)
}
// compare last and new result to get changed services
healthByService := getHealthByService(result)
changedServices := p.getChangedServices(healthByService)
// update the plan last result
p.lastResult = healthByService
// call agent to get updated healthy nodes for each changed service
for k := range changedServices {
p.options.WaitIndex = 0
p.options.Filter = p.healthServiceQueryFilter
p.options = p.options.WithContext(ctx)
result, meta, err := r.client.Health().Service(k, "", true, p.options)
if err != nil {
// on failure, expire service from cache, resolver will fall back to agent
r.logger.Errorf("error invoking health service: %v, for service %s", err, k)
r.registry.expire(k)
// remove healthchecks for service from last result
for key := range p.lastResult {
if k == key.serviceName {
delete(p.lastResult, key)
}
}
// reset plan query index
p.lastParamVal = waitIndexVal(0)
} else {
// updated service entries in registry
r.logger.Debugf("updating consul nr registry for service:%s last-index:%d", k, meta.LastIndex)
r.registry.addOrUpdate(k, result)
}
}
}
// runWatchLoop executes the following steps in a forever loop:
// - gets the keys from the registry
// - executes the watch plan with the targets keys
// - waits for (the watch plan to signal completion) or (the resolver to register a new key)
func (r *resolver) runWatchLoop(p *watchPlan) {
defer func() {
r.registry.removeAll()
r.watcherStarted.Store(false)
}()
watchPlanComplete := make(chan struct{}, 1)
watchLoop:
for {
ctx, cancel := context.WithCancel(context.Background())
// get target keys/app-ids from registry
services := r.registry.getKeys()
watching := false
if len(services) > 0 {
// run watch plan for targets service with channel to signal completion
go r.runWatchPlan(ctx, p, services, watchPlanComplete)
watching = true
}
select {
case <-watchPlanComplete:
cancel()
// wait on channel for new services to track
case service := <-r.registry.registrationChannel():
// cancel watch plan i.e. blocking query to consul agent
cancel()
// generate set of keys
serviceKeys := make(map[string]any)
for i := 0; i < len(services); i++ {
serviceKeys[services[i]] = nil
}
// add service if it's not in the registry
if _, ok := serviceKeys[service]; !ok {
r.registry.addOrUpdate(service, nil)
}
// check for any more new services in channel and do the same
moreServices := true
for moreServices {
select {
case service := <-r.registry.registrationChannel():
if _, ok := serviceKeys[service]; !ok {
r.registry.addOrUpdate(service, nil)
}
default:
moreServices = false
}
}
if watching {
// ensure previous watch plan routine completed before next iteration
<-watchPlanComplete
}
// reset plan failure count and query index
p.failing = false
p.lastParamVal = waitIndexVal(0)
// resolver closing
case <-r.watcherStopChannel:
cancel()
break watchLoop
}
}
}
// startWatcher will configure the watch plan and start the watch loop in a separate routine
func (r *resolver) startWatcher() {
if !r.watcherStarted.CompareAndSwap(false, true) {
return
}
options := *r.config.QueryOptions
options.UseCache = false // always ignore consul agent cache for watcher
options.Filter = "" // don't use configured filter for State() calls
// Configure exponential backoff
ebo := backoff.NewExponentialBackOff()
ebo.InitialInterval = initialBackOffInternal
ebo.MaxInterval = maxBackOffInternal
ebo.MaxElapsedTime = 0
plan := &watchPlan{
options: &options,
healthServiceQueryFilter: r.config.QueryOptions.Filter,
lastResult: make(map[serviceIdentifier]bool),
backOff: ebo,
}
go r.runWatchLoop(plan)
}