365 lines
9.8 KiB
Go
365 lines
9.8 KiB
Go
package consul
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"strings"
|
|
"time"
|
|
|
|
backoff "github.com/cenkalti/backoff/v4"
|
|
consul "github.com/hashicorp/consul/api"
|
|
)
|
|
|
|
const (
|
|
// initial back interval.
|
|
initialBackOffInternal = 5 * time.Second
|
|
|
|
// maximum back off time, this is to prevent exponential runaway.
|
|
maxBackOffInternal = 180 * time.Second
|
|
)
|
|
|
|
// A watchPlan contains all the state tracked in the loop
|
|
// that keeps the consul service registry cache fresh
|
|
type watchPlan struct {
|
|
expired bool
|
|
lastParamVal blockingParamVal
|
|
lastResult map[serviceIdentifier]bool
|
|
options *consul.QueryOptions
|
|
healthServiceQueryFilter string
|
|
failing bool
|
|
backOff *backoff.ExponentialBackOff
|
|
}
|
|
|
|
type blockingParamVal interface {
|
|
equal(other blockingParamVal) bool
|
|
next(previous blockingParamVal) blockingParamVal
|
|
}
|
|
|
|
type waitIndexVal uint64
|
|
|
|
// Equal implements BlockingParamVal.
|
|
func (idx waitIndexVal) equal(other blockingParamVal) bool {
|
|
if otherIdx, ok := other.(waitIndexVal); ok {
|
|
return idx == otherIdx
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// Next implements BlockingParamVal.
|
|
func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal {
|
|
if previous == nil {
|
|
return idx
|
|
}
|
|
prevIdx, ok := previous.(waitIndexVal)
|
|
if ok && prevIdx == idx {
|
|
// this value is the same as the previous index, reset
|
|
return waitIndexVal(0)
|
|
}
|
|
|
|
return idx
|
|
}
|
|
|
|
type serviceIdentifier struct {
|
|
serviceName string
|
|
serviceID string
|
|
node string
|
|
}
|
|
|
|
func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool {
|
|
healthByService := make(map[serviceIdentifier]bool)
|
|
for _, check := range checks {
|
|
// generate unique identifier for service
|
|
id := serviceIdentifier{
|
|
serviceID: check.ServiceID,
|
|
serviceName: check.ServiceName,
|
|
node: check.Node,
|
|
}
|
|
|
|
// if the service is not in the map - add and init to healthy
|
|
if state, ok := healthByService[id]; !ok {
|
|
healthByService[id] = true
|
|
} else if !state {
|
|
// service exists and is already unhealthy - skip
|
|
continue
|
|
}
|
|
|
|
// if the check is not healthy then set service to unhealthy
|
|
if check.Status != consul.HealthPassing {
|
|
healthByService[id] = false
|
|
}
|
|
}
|
|
|
|
return healthByService
|
|
}
|
|
|
|
func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map[string]struct{} {
|
|
changedServices := make(map[string]struct{}) // service name set
|
|
|
|
// foreach new result
|
|
for newKey, newValue := range newResult {
|
|
// if the service exists in the old result and has the same value - skip
|
|
if oldValue, ok := p.lastResult[newKey]; ok && newValue == oldValue {
|
|
continue
|
|
}
|
|
|
|
// service is new or changed - add to set
|
|
changedServices[newKey.serviceName] = struct{}{}
|
|
}
|
|
|
|
// foreach old result
|
|
for oldKey := range p.lastResult {
|
|
// if the service does not exist in the new result - add to set
|
|
if _, ok := newResult[oldKey]; !ok {
|
|
changedServices[oldKey.serviceName] = struct{}{}
|
|
}
|
|
}
|
|
|
|
return changedServices
|
|
}
|
|
|
|
func getServiceNameFilter(services []string) string {
|
|
nameFilters := make([]string, len(services))
|
|
|
|
for i, v := range services {
|
|
nameFilters[i] = `ServiceName=="` + v + `"`
|
|
}
|
|
|
|
return strings.Join(nameFilters, " or ")
|
|
}
|
|
|
|
func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) (blockingParamVal, consul.HealthChecks, error) {
|
|
p.options = p.options.WithContext(ctx)
|
|
|
|
if p.lastParamVal != nil {
|
|
p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal))
|
|
}
|
|
|
|
// build service name filter for all keys
|
|
p.options.Filter = getServiceNameFilter(services)
|
|
|
|
// request health checks for target services using blocking query
|
|
checks, meta, err := r.client.Health().State(consul.HealthAny, p.options)
|
|
if err != nil {
|
|
// if it failed during long poll try again with no wait
|
|
if p.options.WaitIndex != uint64(0) {
|
|
p.options.WaitIndex = 0
|
|
checks, meta, err = r.client.Health().State(consul.HealthAny, p.options)
|
|
}
|
|
|
|
if err != nil {
|
|
// if the context was canceled
|
|
if errors.Is(err, context.Canceled) {
|
|
return nil, nil, err
|
|
}
|
|
|
|
// if it failed with no wait and plan is not expired
|
|
if p.options.WaitIndex == uint64(0) && !p.expired {
|
|
p.lastResult = nil
|
|
p.expired = true
|
|
r.registry.expireAll()
|
|
}
|
|
|
|
return nil, nil, err
|
|
}
|
|
}
|
|
|
|
p.expired = false
|
|
return waitIndexVal(meta.LastIndex), checks, err
|
|
}
|
|
|
|
// runWatchPlan executes the following steps:
|
|
// - requests health check changes for the target keys from the consul agent using http long polling
|
|
// - compares the results to the previous
|
|
// - if there is a change for a given serviceName/appId it invokes the health/service api to get a list of healthy targets
|
|
// - signals completion of the watch plan
|
|
func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan struct{}) {
|
|
defer func() {
|
|
// signal completion of the watch plan to unblock the watch plan loop
|
|
watchPlanComplete <- struct{}{}
|
|
}()
|
|
|
|
// invoke blocking call
|
|
blockParam, result, err := r.watch(ctx, p, services)
|
|
|
|
// if the ctx was canceled then do nothing
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
|
|
// handle an error in the watch function
|
|
if err != nil {
|
|
// reset the query index so the next attempt does not
|
|
p.lastParamVal = waitIndexVal(0)
|
|
|
|
// perform an exponential backoff
|
|
if !p.failing {
|
|
p.failing = true
|
|
p.backOff.Reset()
|
|
}
|
|
|
|
retry := p.backOff.NextBackOff()
|
|
|
|
// pause watcher routine until ctx is canceled or retry timer finishes
|
|
r.logger.Errorf("consul service-watcher error: %v, retry in %s", err, retry.Round(time.Second))
|
|
sleepTimer := time.NewTimer(retry)
|
|
select {
|
|
case <-ctx.Done():
|
|
sleepTimer.Stop()
|
|
r.logger.Debug("consul service-watcher retry throttling canceled")
|
|
case <-sleepTimer.C:
|
|
}
|
|
|
|
return
|
|
} else {
|
|
// reset the plan failure flag
|
|
p.failing = false
|
|
}
|
|
|
|
// if the result index is unchanged do nothing
|
|
if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) {
|
|
return
|
|
} else {
|
|
// update the plan index
|
|
oldParamVal := p.lastParamVal
|
|
p.lastParamVal = blockParam.next(oldParamVal)
|
|
}
|
|
|
|
// compare last and new result to get changed services
|
|
healthByService := getHealthByService(result)
|
|
changedServices := p.getChangedServices(healthByService)
|
|
|
|
// update the plan last result
|
|
p.lastResult = healthByService
|
|
|
|
// call agent to get updated healthy nodes for each changed service
|
|
for k := range changedServices {
|
|
p.options.WaitIndex = 0
|
|
p.options.Filter = p.healthServiceQueryFilter
|
|
p.options = p.options.WithContext(ctx)
|
|
result, meta, err := r.client.Health().Service(k, "", true, p.options)
|
|
|
|
if err != nil {
|
|
// on failure, expire service from cache, resolver will fall back to agent
|
|
r.logger.Errorf("error invoking health service: %v, for service %s", err, k)
|
|
r.registry.expire(k)
|
|
|
|
// remove healthchecks for service from last result
|
|
for key := range p.lastResult {
|
|
if k == key.serviceName {
|
|
delete(p.lastResult, key)
|
|
}
|
|
}
|
|
|
|
// reset plan query index
|
|
p.lastParamVal = waitIndexVal(0)
|
|
} else {
|
|
// updated service entries in registry
|
|
r.logger.Debugf("updating consul nr registry for service:%s last-index:%d", k, meta.LastIndex)
|
|
r.registry.addOrUpdate(k, result)
|
|
}
|
|
}
|
|
}
|
|
|
|
// runWatchLoop executes the following steps in a forever loop:
|
|
// - gets the keys from the registry
|
|
// - executes the watch plan with the targets keys
|
|
// - waits for (the watch plan to signal completion) or (the resolver to register a new key)
|
|
func (r *resolver) runWatchLoop(p *watchPlan) {
|
|
defer func() {
|
|
r.registry.removeAll()
|
|
r.watcherStarted.Store(false)
|
|
}()
|
|
|
|
watchPlanComplete := make(chan struct{}, 1)
|
|
|
|
watchLoop:
|
|
for {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
// get target keys/app-ids from registry
|
|
services := r.registry.getKeys()
|
|
watching := false
|
|
|
|
if len(services) > 0 {
|
|
// run watch plan for targets service with channel to signal completion
|
|
go r.runWatchPlan(ctx, p, services, watchPlanComplete)
|
|
watching = true
|
|
}
|
|
|
|
select {
|
|
case <-watchPlanComplete:
|
|
cancel()
|
|
|
|
// wait on channel for new services to track
|
|
case service := <-r.registry.registrationChannel():
|
|
// cancel watch plan i.e. blocking query to consul agent
|
|
cancel()
|
|
|
|
// generate set of keys
|
|
serviceKeys := make(map[string]any)
|
|
for i := 0; i < len(services); i++ {
|
|
serviceKeys[services[i]] = nil
|
|
}
|
|
|
|
// add service if it's not in the registry
|
|
if _, ok := serviceKeys[service]; !ok {
|
|
r.registry.addOrUpdate(service, nil)
|
|
}
|
|
|
|
// check for any more new services in channel and do the same
|
|
moreServices := true
|
|
for moreServices {
|
|
select {
|
|
case service := <-r.registry.registrationChannel():
|
|
if _, ok := serviceKeys[service]; !ok {
|
|
r.registry.addOrUpdate(service, nil)
|
|
}
|
|
default:
|
|
moreServices = false
|
|
}
|
|
}
|
|
|
|
if watching {
|
|
// ensure previous watch plan routine completed before next iteration
|
|
<-watchPlanComplete
|
|
}
|
|
|
|
// reset plan failure count and query index
|
|
p.failing = false
|
|
p.lastParamVal = waitIndexVal(0)
|
|
|
|
// resolver closing
|
|
case <-r.watcherStopChannel:
|
|
cancel()
|
|
break watchLoop
|
|
}
|
|
}
|
|
}
|
|
|
|
// startWatcher will configure the watch plan and start the watch loop in a separate routine
|
|
func (r *resolver) startWatcher() {
|
|
if !r.watcherStarted.CompareAndSwap(false, true) {
|
|
return
|
|
}
|
|
|
|
options := *r.config.QueryOptions
|
|
options.UseCache = false // always ignore consul agent cache for watcher
|
|
options.Filter = "" // don't use configured filter for State() calls
|
|
|
|
// Configure exponential backoff
|
|
ebo := backoff.NewExponentialBackOff()
|
|
ebo.InitialInterval = initialBackOffInternal
|
|
ebo.MaxInterval = maxBackOffInternal
|
|
ebo.MaxElapsedTime = 0
|
|
|
|
plan := &watchPlan{
|
|
options: &options,
|
|
healthServiceQueryFilter: r.config.QueryOptions.Filter,
|
|
lastResult: make(map[serviceIdentifier]bool),
|
|
backOff: ebo,
|
|
}
|
|
|
|
go r.runWatchLoop(plan)
|
|
}
|