Consul name resolution in-memory cache (#3121)
Signed-off-by: Abdulaziz Elsheikh <abdulaziz.elsheikh@gmail.com> Signed-off-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com> Co-authored-by: Alessandro (Ale) Segala <43508+ItalyPaleAle@users.noreply.github.com>
This commit is contained in:
parent
81e2e05f38
commit
b3436837f2
|
@ -1,6 +1,6 @@
|
|||
# Consul Name Resolution
|
||||
|
||||
The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behaviour on init and resolution.
|
||||
The consul name resolution component gives the ability to register and resolve other "daprized" services registered on a consul estate. It is flexible in that it allows for complex to minimal configurations driving the behavior on init and resolution.
|
||||
|
||||
## How To Use
|
||||
|
||||
|
@ -35,7 +35,7 @@ spec:
|
|||
```
|
||||
|
||||
|
||||
## Behaviour
|
||||
## Behavior
|
||||
|
||||
On init the consul component will either validate the connection to the configured (or default) agent or register the service if configured to do so. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services.
|
||||
|
||||
|
@ -54,9 +54,10 @@ As of writing the configuration spec is fixed to v1.3.0 of the consul api
|
|||
| Tags | `[]string` | Configures any tags to include if/when registering services |
|
||||
| Meta | `map[string]string` | Configures any additional metadata to include if/when registering services |
|
||||
| DaprPortMetaKey | `string` | The key used for getting the Dapr sidecar port from consul service metadata during service resolution, it will also be used to set the Dapr sidecar port in metadata during registration. If blank it will default to `DAPR_PORT` |
|
||||
| SelfRegister | `bool` | Controls if Dapr will register the service to consul. The name resolution interface does not cater for an "on shutdown" pattern so please consider this if using Dapr to register services to consul as it will not deregister services. |
|
||||
| SelfRegister | `bool` | Controls if Dapr will register the service to consul on startup. If unset it will default to `false` |
|
||||
| SelfDeregister | `bool` | Controls if Dapr will deregister the service from consul on shutdown. If unset it will default to `false` |
|
||||
| AdvancedRegistration | [*api.AgentServiceRegistration](https://pkg.go.dev/github.com/hashicorp/consul/api@v1.3.0#AgentServiceRegistration) | Gives full control of service registration through configuration. If configured the component will ignore any configuration of Checks, Tags, Meta and SelfRegister. |
|
||||
|
||||
| UseCache | `bool` | Configures if Dapr will cache the resolved services in-memory. This is done using consul [blocking queries](https://www.consul.io/api-docs/features/blocking) which can be configured via the QueryOptions configuration. If unset it will default to `false` |
|
||||
## Samples Configurations
|
||||
|
||||
### Basic
|
||||
|
|
|
@ -23,6 +23,8 @@ import (
|
|||
"github.com/dapr/kit/config"
|
||||
)
|
||||
|
||||
const defaultDaprPortMetaKey string = "DAPR_PORT" // default key for DaprPort in meta
|
||||
|
||||
// The intermediateConfig is based off of the consul api types. User configurations are
|
||||
// deserialized into this type before being converted to the equivalent consul types
|
||||
// that way breaking changes in future versions of the consul api cannot break user configuration.
|
||||
|
@ -33,8 +35,10 @@ type intermediateConfig struct {
|
|||
Meta map[string]string
|
||||
QueryOptions *QueryOptions
|
||||
AdvancedRegistration *AgentServiceRegistration // advanced use-case
|
||||
SelfRegister bool
|
||||
DaprPortMetaKey string
|
||||
SelfRegister bool
|
||||
SelfDeregister bool
|
||||
UseCache bool
|
||||
}
|
||||
|
||||
type configSpec struct {
|
||||
|
@ -44,8 +48,16 @@ type configSpec struct {
|
|||
Meta map[string]string
|
||||
QueryOptions *consul.QueryOptions
|
||||
AdvancedRegistration *consul.AgentServiceRegistration // advanced use-case
|
||||
SelfRegister bool
|
||||
DaprPortMetaKey string
|
||||
SelfRegister bool
|
||||
SelfDeregister bool
|
||||
UseCache bool
|
||||
}
|
||||
|
||||
func newIntermediateConfig() intermediateConfig {
|
||||
return intermediateConfig{
|
||||
DaprPortMetaKey: defaultDaprPortMetaKey,
|
||||
}
|
||||
}
|
||||
|
||||
func parseConfig(rawConfig interface{}) (configSpec, error) {
|
||||
|
@ -60,7 +72,7 @@ func parseConfig(rawConfig interface{}) (configSpec, error) {
|
|||
return result, fmt.Errorf("error serializing to json: %w", err)
|
||||
}
|
||||
|
||||
var configuration intermediateConfig
|
||||
configuration := newIntermediateConfig()
|
||||
err = json.Unmarshal(data, &configuration)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("error deserializing to configSpec: %w", err)
|
||||
|
@ -80,7 +92,9 @@ func mapConfig(config intermediateConfig) configSpec {
|
|||
QueryOptions: mapQueryOptions(config.QueryOptions),
|
||||
AdvancedRegistration: mapAdvancedRegistration(config.AdvancedRegistration),
|
||||
SelfRegister: config.SelfRegister,
|
||||
SelfDeregister: config.SelfDeregister,
|
||||
DaprPortMetaKey: config.DaprPortMetaKey,
|
||||
UseCache: config.UseCache,
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,6 +18,8 @@ import (
|
|||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
|
||||
|
@ -25,8 +27,6 @@ import (
|
|||
"github.com/dapr/kit/logger"
|
||||
)
|
||||
|
||||
const daprMeta string = "DAPR_PORT" // default key for DAPR_PORT metadata
|
||||
|
||||
type client struct {
|
||||
*consul.Client
|
||||
}
|
||||
|
@ -59,34 +59,181 @@ type clientInterface interface {
|
|||
type agentInterface interface {
|
||||
Self() (map[string]map[string]interface{}, error)
|
||||
ServiceRegister(service *consul.AgentServiceRegistration) error
|
||||
ServiceDeregister(serviceID string) error
|
||||
}
|
||||
|
||||
type healthInterface interface {
|
||||
Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
|
||||
State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error)
|
||||
}
|
||||
|
||||
type resolver struct {
|
||||
config resolverConfig
|
||||
logger logger.Logger
|
||||
client clientInterface
|
||||
config resolverConfig
|
||||
logger logger.Logger
|
||||
client clientInterface
|
||||
registry registryInterface
|
||||
watcherStarted atomic.Bool
|
||||
watcherStopChannel chan struct{}
|
||||
}
|
||||
|
||||
type registryInterface interface {
|
||||
getKeys() []string
|
||||
get(service string) *registryEntry
|
||||
expire(service string) // clears slice of instances
|
||||
expireAll() // clears slice of instances for all entries
|
||||
remove(service string) // removes entry from registry
|
||||
removeAll() // removes all entries from registry
|
||||
addOrUpdate(service string, services []*consul.ServiceEntry)
|
||||
registrationChannel() chan string
|
||||
}
|
||||
|
||||
type registry struct {
|
||||
entries sync.Map
|
||||
serviceChannel chan string
|
||||
}
|
||||
|
||||
type registryEntry struct {
|
||||
services []*consul.ServiceEntry
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (r *registry) getKeys() []string {
|
||||
var keys []string
|
||||
r.entries.Range(func(key any, value any) bool {
|
||||
k := key.(string)
|
||||
keys = append(keys, k)
|
||||
return true
|
||||
})
|
||||
return keys
|
||||
}
|
||||
|
||||
func (r *registry) get(service string) *registryEntry {
|
||||
if result, ok := r.entries.Load(service); ok {
|
||||
return result.(*registryEntry)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *registryEntry) next() *consul.ServiceEntry {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
if len(e.services) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
// gosec is complaining that we are using a non-crypto-safe PRNG. This is fine in this scenario since we are using it only for selecting a random address for load-balancing.
|
||||
//nolint:gosec
|
||||
return e.services[rand.Int()%len(e.services)]
|
||||
}
|
||||
|
||||
func (r *resolver) getService(service string) (*consul.ServiceEntry, error) {
|
||||
var services []*consul.ServiceEntry
|
||||
|
||||
if r.config.UseCache {
|
||||
r.startWatcher()
|
||||
|
||||
entry := r.registry.get(service)
|
||||
if entry != nil {
|
||||
result := entry.next()
|
||||
|
||||
if result != nil {
|
||||
return result, nil
|
||||
}
|
||||
} else {
|
||||
r.registry.registrationChannel() <- service
|
||||
}
|
||||
}
|
||||
|
||||
options := *r.config.QueryOptions
|
||||
options.WaitHash = ""
|
||||
options.WaitIndex = 0
|
||||
services, _, err := r.client.Health().Service(service, "", true, &options)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query healthy consul services: %w", err)
|
||||
} else if len(services) == 0 {
|
||||
return nil, fmt.Errorf("no healthy services found with AppID '%s'", service)
|
||||
}
|
||||
|
||||
//nolint:gosec
|
||||
return services[rand.Int()%len(services)], nil
|
||||
}
|
||||
|
||||
func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) {
|
||||
// update
|
||||
entry := r.get(service)
|
||||
if entry != nil {
|
||||
entry.mu.Lock()
|
||||
defer entry.mu.Unlock()
|
||||
|
||||
entry.services = services
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// add
|
||||
r.entries.Store(service, ®istryEntry{
|
||||
services: services,
|
||||
})
|
||||
}
|
||||
|
||||
func (r *registry) remove(service string) {
|
||||
r.entries.Delete(service)
|
||||
}
|
||||
|
||||
func (r *registry) removeAll() {
|
||||
r.entries.Range(func(key any, value any) bool {
|
||||
r.remove(key.(string))
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (r *registry) expire(service string) {
|
||||
entry := r.get(service)
|
||||
if entry == nil {
|
||||
return
|
||||
}
|
||||
|
||||
entry.mu.Lock()
|
||||
defer entry.mu.Unlock()
|
||||
|
||||
entry.services = nil
|
||||
}
|
||||
|
||||
func (r *registry) expireAll() {
|
||||
r.entries.Range(func(key any, value any) bool {
|
||||
r.expire(key.(string))
|
||||
return true
|
||||
})
|
||||
}
|
||||
|
||||
func (r *registry) registrationChannel() chan string {
|
||||
return r.serviceChannel
|
||||
}
|
||||
|
||||
type resolverConfig struct {
|
||||
Client *consul.Config
|
||||
QueryOptions *consul.QueryOptions
|
||||
Registration *consul.AgentServiceRegistration
|
||||
DaprPortMetaKey string
|
||||
Client *consul.Config
|
||||
QueryOptions *consul.QueryOptions
|
||||
Registration *consul.AgentServiceRegistration
|
||||
DeregisterOnClose bool
|
||||
DaprPortMetaKey string
|
||||
UseCache bool
|
||||
}
|
||||
|
||||
// NewResolver creates Consul name resolver.
|
||||
func NewResolver(logger logger.Logger) nr.Resolver {
|
||||
return newResolver(logger, &client{})
|
||||
return newResolver(logger, resolverConfig{}, &client{}, ®istry{serviceChannel: make(chan string, 100)}, make(chan struct{}))
|
||||
}
|
||||
|
||||
func newResolver(logger logger.Logger, client clientInterface) *resolver {
|
||||
func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface, watcherStopChannel chan struct{}) nr.Resolver {
|
||||
return &resolver{
|
||||
logger: logger,
|
||||
client: client,
|
||||
logger: logger,
|
||||
config: resolverConfig,
|
||||
client: client,
|
||||
registry: registry,
|
||||
watcherStopChannel: watcherStopChannel,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -129,23 +276,14 @@ func (r *resolver) Init(metadata nr.Metadata) (err error) {
|
|||
// ResolveID resolves name to address via consul.
|
||||
func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
|
||||
cfg := r.config
|
||||
services, _, err := r.client.Health().Service(req.ID, "", true, cfg.QueryOptions)
|
||||
svc, err := r.getService(req.ID)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to query healthy consul services: %w", err)
|
||||
return "", err
|
||||
}
|
||||
|
||||
if len(services) == 0 {
|
||||
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
|
||||
}
|
||||
|
||||
// Pick a random service from the result
|
||||
// Note: we're using math/random here as PRNG and that's ok since we're just using this for selecting a random address from a list for load-balancing, so we don't need a CSPRNG
|
||||
//nolint:gosec
|
||||
svc := services[rand.Int()%len(services)]
|
||||
|
||||
port := svc.Service.Meta[cfg.DaprPortMetaKey]
|
||||
if port == "" {
|
||||
return "", fmt.Errorf("target service AppID '%s' found but DAPR_PORT missing from meta", req.ID)
|
||||
return "", fmt.Errorf("target service AppID '%s' found but %s missing from meta", req.ID, cfg.DaprPortMetaKey)
|
||||
}
|
||||
|
||||
if svc.Service.Address != "" {
|
||||
|
@ -159,6 +297,24 @@ func (r *resolver) ResolveID(req nr.ResolveRequest) (addr string, err error) {
|
|||
return formatAddress(addr, port)
|
||||
}
|
||||
|
||||
// Close will stop the watcher and deregister app from consul
|
||||
func (r *resolver) Close() error {
|
||||
if r.watcherStarted.Load() {
|
||||
r.watcherStopChannel <- struct{}{}
|
||||
}
|
||||
|
||||
if r.config.Registration != nil && r.config.DeregisterOnClose {
|
||||
err := r.client.Agent().ServiceDeregister(r.config.Registration.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to deregister consul service: %w", err)
|
||||
}
|
||||
|
||||
r.logger.Info("deregistered service from consul")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func formatAddress(address string, port string) (addr string, err error) {
|
||||
if net.ParseIP(address).To4() != nil {
|
||||
return address + ":" + port, nil
|
||||
|
@ -180,12 +336,9 @@ func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
|
|||
return resolverCfg, err
|
||||
}
|
||||
|
||||
// set DaprPortMetaKey used for registring DaprPort and resolving from Consul
|
||||
if cfg.DaprPortMetaKey == "" {
|
||||
resolverCfg.DaprPortMetaKey = daprMeta
|
||||
} else {
|
||||
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
|
||||
}
|
||||
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
|
||||
resolverCfg.DeregisterOnClose = cfg.SelfDeregister
|
||||
resolverCfg.UseCache = cfg.UseCache
|
||||
|
||||
resolverCfg.Client = getClientConfig(cfg)
|
||||
resolverCfg.Registration, err = getRegistrationConfig(cfg, metadata.Properties)
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,364 @@
|
|||
package consul
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
backoff "github.com/cenkalti/backoff/v4"
|
||||
consul "github.com/hashicorp/consul/api"
|
||||
)
|
||||
|
||||
const (
|
||||
// initial back interval.
|
||||
initialBackOffInternal = 5 * time.Second
|
||||
|
||||
// maximum back off time, this is to prevent exponential runaway.
|
||||
maxBackOffInternal = 180 * time.Second
|
||||
)
|
||||
|
||||
// A watchPlan contains all the state tracked in the loop
|
||||
// that keeps the consul service registry cache fresh
|
||||
type watchPlan struct {
|
||||
expired bool
|
||||
lastParamVal blockingParamVal
|
||||
lastResult map[serviceIdentifier]bool
|
||||
options *consul.QueryOptions
|
||||
healthServiceQueryFilter string
|
||||
failing bool
|
||||
backOff *backoff.ExponentialBackOff
|
||||
}
|
||||
|
||||
type blockingParamVal interface {
|
||||
equal(other blockingParamVal) bool
|
||||
next(previous blockingParamVal) blockingParamVal
|
||||
}
|
||||
|
||||
type waitIndexVal uint64
|
||||
|
||||
// Equal implements BlockingParamVal.
|
||||
func (idx waitIndexVal) equal(other blockingParamVal) bool {
|
||||
if otherIdx, ok := other.(waitIndexVal); ok {
|
||||
return idx == otherIdx
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// Next implements BlockingParamVal.
|
||||
func (idx waitIndexVal) next(previous blockingParamVal) blockingParamVal {
|
||||
if previous == nil {
|
||||
return idx
|
||||
}
|
||||
prevIdx, ok := previous.(waitIndexVal)
|
||||
if ok && prevIdx == idx {
|
||||
// this value is the same as the previous index, reset
|
||||
return waitIndexVal(0)
|
||||
}
|
||||
|
||||
return idx
|
||||
}
|
||||
|
||||
type serviceIdentifier struct {
|
||||
serviceName string
|
||||
serviceID string
|
||||
node string
|
||||
}
|
||||
|
||||
func getHealthByService(checks consul.HealthChecks) map[serviceIdentifier]bool {
|
||||
healthByService := make(map[serviceIdentifier]bool)
|
||||
for _, check := range checks {
|
||||
// generate unique identifier for service
|
||||
id := serviceIdentifier{
|
||||
serviceID: check.ServiceID,
|
||||
serviceName: check.ServiceName,
|
||||
node: check.Node,
|
||||
}
|
||||
|
||||
// if the service is not in the map - add and init to healthy
|
||||
if state, ok := healthByService[id]; !ok {
|
||||
healthByService[id] = true
|
||||
} else if !state {
|
||||
// service exists and is already unhealthy - skip
|
||||
continue
|
||||
}
|
||||
|
||||
// if the check is not healthy then set service to unhealthy
|
||||
if check.Status != consul.HealthPassing {
|
||||
healthByService[id] = false
|
||||
}
|
||||
}
|
||||
|
||||
return healthByService
|
||||
}
|
||||
|
||||
func (p *watchPlan) getChangedServices(newResult map[serviceIdentifier]bool) map[string]struct{} {
|
||||
changedServices := make(map[string]struct{}) // service name set
|
||||
|
||||
// foreach new result
|
||||
for newKey, newValue := range newResult {
|
||||
// if the service exists in the old result and has the same value - skip
|
||||
if oldValue, ok := p.lastResult[newKey]; ok && newValue == oldValue {
|
||||
continue
|
||||
}
|
||||
|
||||
// service is new or changed - add to set
|
||||
changedServices[newKey.serviceName] = struct{}{}
|
||||
}
|
||||
|
||||
// foreach old result
|
||||
for oldKey := range p.lastResult {
|
||||
// if the service does not exist in the new result - add to set
|
||||
if _, ok := newResult[oldKey]; !ok {
|
||||
changedServices[oldKey.serviceName] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
return changedServices
|
||||
}
|
||||
|
||||
func getServiceNameFilter(services []string) string {
|
||||
nameFilters := make([]string, len(services))
|
||||
|
||||
for i, v := range services {
|
||||
nameFilters[i] = `ServiceName=="` + v + `"`
|
||||
}
|
||||
|
||||
return strings.Join(nameFilters, " or ")
|
||||
}
|
||||
|
||||
func (r *resolver) watch(ctx context.Context, p *watchPlan, services []string) (blockingParamVal, consul.HealthChecks, error) {
|
||||
p.options = p.options.WithContext(ctx)
|
||||
|
||||
if p.lastParamVal != nil {
|
||||
p.options.WaitIndex = uint64(p.lastParamVal.(waitIndexVal))
|
||||
}
|
||||
|
||||
// build service name filter for all keys
|
||||
p.options.Filter = getServiceNameFilter(services)
|
||||
|
||||
// request health checks for target services using blocking query
|
||||
checks, meta, err := r.client.Health().State(consul.HealthAny, p.options)
|
||||
if err != nil {
|
||||
// if it failed during long poll try again with no wait
|
||||
if p.options.WaitIndex != uint64(0) {
|
||||
p.options.WaitIndex = 0
|
||||
checks, meta, err = r.client.Health().State(consul.HealthAny, p.options)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
// if the context was canceled
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
// if it failed with no wait and plan is not expired
|
||||
if p.options.WaitIndex == uint64(0) && !p.expired {
|
||||
p.lastResult = nil
|
||||
p.expired = true
|
||||
r.registry.expireAll()
|
||||
}
|
||||
|
||||
return nil, nil, err
|
||||
}
|
||||
}
|
||||
|
||||
p.expired = false
|
||||
return waitIndexVal(meta.LastIndex), checks, err
|
||||
}
|
||||
|
||||
// runWatchPlan executes the following steps:
|
||||
// - requests health check changes for the target keys from the consul agent using http long polling
|
||||
// - compares the results to the previous
|
||||
// - if there is a change for a given serviceName/appId it invokes the health/service api to get a list of healthy targets
|
||||
// - signals completion of the watch plan
|
||||
func (r *resolver) runWatchPlan(ctx context.Context, p *watchPlan, services []string, watchPlanComplete chan struct{}) {
|
||||
defer func() {
|
||||
// signal completion of the watch plan to unblock the watch plan loop
|
||||
watchPlanComplete <- struct{}{}
|
||||
}()
|
||||
|
||||
// invoke blocking call
|
||||
blockParam, result, err := r.watch(ctx, p, services)
|
||||
|
||||
// if the ctx was canceled then do nothing
|
||||
if errors.Is(err, context.Canceled) {
|
||||
return
|
||||
}
|
||||
|
||||
// handle an error in the watch function
|
||||
if err != nil {
|
||||
// reset the query index so the next attempt does not
|
||||
p.lastParamVal = waitIndexVal(0)
|
||||
|
||||
// perform an exponential backoff
|
||||
if !p.failing {
|
||||
p.failing = true
|
||||
p.backOff.Reset()
|
||||
}
|
||||
|
||||
retry := p.backOff.NextBackOff()
|
||||
|
||||
// pause watcher routine until ctx is canceled or retry timer finishes
|
||||
r.logger.Errorf("consul service-watcher error: %v, retry in %s", err, retry.Round(time.Second))
|
||||
sleepTimer := time.NewTimer(retry)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
sleepTimer.Stop()
|
||||
r.logger.Debug("consul service-watcher retry throttling canceled")
|
||||
case <-sleepTimer.C:
|
||||
}
|
||||
|
||||
return
|
||||
} else {
|
||||
// reset the plan failure flag
|
||||
p.failing = false
|
||||
}
|
||||
|
||||
// if the result index is unchanged do nothing
|
||||
if p.lastParamVal != nil && p.lastParamVal.equal(blockParam) {
|
||||
return
|
||||
} else {
|
||||
// update the plan index
|
||||
oldParamVal := p.lastParamVal
|
||||
p.lastParamVal = blockParam.next(oldParamVal)
|
||||
}
|
||||
|
||||
// compare last and new result to get changed services
|
||||
healthByService := getHealthByService(result)
|
||||
changedServices := p.getChangedServices(healthByService)
|
||||
|
||||
// update the plan last result
|
||||
p.lastResult = healthByService
|
||||
|
||||
// call agent to get updated healthy nodes for each changed service
|
||||
for k := range changedServices {
|
||||
p.options.WaitIndex = 0
|
||||
p.options.Filter = p.healthServiceQueryFilter
|
||||
p.options = p.options.WithContext(ctx)
|
||||
result, meta, err := r.client.Health().Service(k, "", true, p.options)
|
||||
|
||||
if err != nil {
|
||||
// on failure, expire service from cache, resolver will fall back to agent
|
||||
r.logger.Errorf("error invoking health service: %v, for service %s", err, k)
|
||||
r.registry.expire(k)
|
||||
|
||||
// remove healthchecks for service from last result
|
||||
for key := range p.lastResult {
|
||||
if k == key.serviceName {
|
||||
delete(p.lastResult, key)
|
||||
}
|
||||
}
|
||||
|
||||
// reset plan query index
|
||||
p.lastParamVal = waitIndexVal(0)
|
||||
} else {
|
||||
// updated service entries in registry
|
||||
r.logger.Debugf("updating consul nr registry for service:%s last-index:%d", k, meta.LastIndex)
|
||||
r.registry.addOrUpdate(k, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// runWatchLoop executes the following steps in a forever loop:
|
||||
// - gets the keys from the registry
|
||||
// - executes the watch plan with the targets keys
|
||||
// - waits for (the watch plan to signal completion) or (the resolver to register a new key)
|
||||
func (r *resolver) runWatchLoop(p *watchPlan) {
|
||||
defer func() {
|
||||
r.registry.removeAll()
|
||||
r.watcherStarted.Store(false)
|
||||
}()
|
||||
|
||||
watchPlanComplete := make(chan struct{}, 1)
|
||||
|
||||
watchLoop:
|
||||
for {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// get target keys/app-ids from registry
|
||||
services := r.registry.getKeys()
|
||||
watching := false
|
||||
|
||||
if len(services) > 0 {
|
||||
// run watch plan for targets service with channel to signal completion
|
||||
go r.runWatchPlan(ctx, p, services, watchPlanComplete)
|
||||
watching = true
|
||||
}
|
||||
|
||||
select {
|
||||
case <-watchPlanComplete:
|
||||
cancel()
|
||||
|
||||
// wait on channel for new services to track
|
||||
case service := <-r.registry.registrationChannel():
|
||||
// cancel watch plan i.e. blocking query to consul agent
|
||||
cancel()
|
||||
|
||||
// generate set of keys
|
||||
serviceKeys := make(map[string]any)
|
||||
for i := 0; i < len(services); i++ {
|
||||
serviceKeys[services[i]] = nil
|
||||
}
|
||||
|
||||
// add service if it's not in the registry
|
||||
if _, ok := serviceKeys[service]; !ok {
|
||||
r.registry.addOrUpdate(service, nil)
|
||||
}
|
||||
|
||||
// check for any more new services in channel and do the same
|
||||
moreServices := true
|
||||
for moreServices {
|
||||
select {
|
||||
case service := <-r.registry.registrationChannel():
|
||||
if _, ok := serviceKeys[service]; !ok {
|
||||
r.registry.addOrUpdate(service, nil)
|
||||
}
|
||||
default:
|
||||
moreServices = false
|
||||
}
|
||||
}
|
||||
|
||||
if watching {
|
||||
// ensure previous watch plan routine completed before next iteration
|
||||
<-watchPlanComplete
|
||||
}
|
||||
|
||||
// reset plan failure count and query index
|
||||
p.failing = false
|
||||
p.lastParamVal = waitIndexVal(0)
|
||||
|
||||
// resolver closing
|
||||
case <-r.watcherStopChannel:
|
||||
cancel()
|
||||
break watchLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// startWatcher will configure the watch plan and start the watch loop in a separate routine
|
||||
func (r *resolver) startWatcher() {
|
||||
if !r.watcherStarted.CompareAndSwap(false, true) {
|
||||
return
|
||||
}
|
||||
|
||||
options := *r.config.QueryOptions
|
||||
options.UseCache = false // always ignore consul agent cache for watcher
|
||||
options.Filter = "" // don't use configured filter for State() calls
|
||||
|
||||
// Configure exponential backoff
|
||||
ebo := backoff.NewExponentialBackOff()
|
||||
ebo.InitialInterval = initialBackOffInternal
|
||||
ebo.MaxInterval = maxBackOffInternal
|
||||
ebo.MaxElapsedTime = 0
|
||||
|
||||
plan := &watchPlan{
|
||||
options: &options,
|
||||
healthServiceQueryFilter: r.config.QueryOptions.Filter,
|
||||
lastResult: make(map[serviceIdentifier]bool),
|
||||
backOff: ebo,
|
||||
}
|
||||
|
||||
go r.runWatchLoop(plan)
|
||||
}
|
Loading…
Reference in New Issue