456 lines
12 KiB
Go
456 lines
12 KiB
Go
/*
|
|
Copyright 2021 The Dapr Authors
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package consul
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"math/rand"
|
|
"net"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
consul "github.com/hashicorp/consul/api"
|
|
|
|
nr "github.com/dapr/components-contrib/nameresolution"
|
|
"github.com/dapr/kit/logger"
|
|
)
|
|
|
|
type client struct {
|
|
*consul.Client
|
|
}
|
|
|
|
func (c *client) InitClient(config *consul.Config) error {
|
|
var err error
|
|
|
|
c.Client, err = consul.NewClient(config)
|
|
if err != nil {
|
|
return fmt.Errorf("consul api error initing client: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *client) Agent() agentInterface {
|
|
return c.Client.Agent()
|
|
}
|
|
|
|
func (c *client) Health() healthInterface {
|
|
return c.Client.Health()
|
|
}
|
|
|
|
type clientInterface interface {
|
|
InitClient(config *consul.Config) error
|
|
Agent() agentInterface
|
|
Health() healthInterface
|
|
}
|
|
|
|
type agentInterface interface {
|
|
Self() (map[string]map[string]interface{}, error)
|
|
ServiceRegister(service *consul.AgentServiceRegistration) error
|
|
ServiceDeregister(serviceID string) error
|
|
}
|
|
|
|
type healthInterface interface {
|
|
Service(service, tag string, passingOnly bool, q *consul.QueryOptions) ([]*consul.ServiceEntry, *consul.QueryMeta, error)
|
|
State(state string, q *consul.QueryOptions) (consul.HealthChecks, *consul.QueryMeta, error)
|
|
}
|
|
|
|
type resolver struct {
|
|
config resolverConfig
|
|
logger logger.Logger
|
|
client clientInterface
|
|
registry registryInterface
|
|
watcherStarted atomic.Bool
|
|
watcherStopChannel chan struct{}
|
|
}
|
|
|
|
type registryInterface interface {
|
|
getKeys() []string
|
|
get(service string) *registryEntry
|
|
expire(service string) // clears slice of instances
|
|
expireAll() // clears slice of instances for all entries
|
|
remove(service string) // removes entry from registry
|
|
removeAll() // removes all entries from registry
|
|
addOrUpdate(service string, services []*consul.ServiceEntry)
|
|
registrationChannel() chan string
|
|
}
|
|
|
|
type registry struct {
|
|
entries sync.Map
|
|
serviceChannel chan string
|
|
}
|
|
|
|
type registryEntry struct {
|
|
services []*consul.ServiceEntry
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
func (r *registry) getKeys() []string {
|
|
var keys []string
|
|
r.entries.Range(func(key any, value any) bool {
|
|
k := key.(string)
|
|
keys = append(keys, k)
|
|
return true
|
|
})
|
|
return keys
|
|
}
|
|
|
|
func (r *registry) get(service string) *registryEntry {
|
|
if result, ok := r.entries.Load(service); ok {
|
|
return result.(*registryEntry)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (e *registryEntry) next() *consul.ServiceEntry {
|
|
e.mu.Lock()
|
|
defer e.mu.Unlock()
|
|
|
|
if len(e.services) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// gosec is complaining that we are using a non-crypto-safe PRNG. This is fine in this scenario since we are using it only for selecting a random address for load-balancing.
|
|
//nolint:gosec
|
|
return e.services[rand.Int()%len(e.services)]
|
|
}
|
|
|
|
func (r *resolver) getService(service string) (*consul.ServiceEntry, error) {
|
|
var services []*consul.ServiceEntry
|
|
|
|
if r.config.UseCache {
|
|
r.startWatcher()
|
|
|
|
entry := r.registry.get(service)
|
|
if entry != nil {
|
|
result := entry.next()
|
|
|
|
if result != nil {
|
|
return result, nil
|
|
}
|
|
} else {
|
|
r.registry.registrationChannel() <- service
|
|
}
|
|
}
|
|
|
|
options := *r.config.QueryOptions
|
|
options.WaitHash = ""
|
|
options.WaitIndex = 0
|
|
services, _, err := r.client.Health().Service(service, "", true, &options)
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query healthy consul services: %w", err)
|
|
} else if len(services) == 0 {
|
|
return nil, fmt.Errorf("no healthy services found with AppID '%s'", service)
|
|
}
|
|
|
|
//nolint:gosec
|
|
return services[rand.Int()%len(services)], nil
|
|
}
|
|
|
|
func (r *registry) addOrUpdate(service string, services []*consul.ServiceEntry) {
|
|
// update
|
|
entry := r.get(service)
|
|
if entry != nil {
|
|
entry.mu.Lock()
|
|
defer entry.mu.Unlock()
|
|
|
|
entry.services = services
|
|
|
|
return
|
|
}
|
|
|
|
// add
|
|
r.entries.Store(service, ®istryEntry{
|
|
services: services,
|
|
})
|
|
}
|
|
|
|
func (r *registry) remove(service string) {
|
|
r.entries.Delete(service)
|
|
}
|
|
|
|
func (r *registry) removeAll() {
|
|
r.entries.Range(func(key any, value any) bool {
|
|
r.remove(key.(string))
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (r *registry) expire(service string) {
|
|
entry := r.get(service)
|
|
if entry == nil {
|
|
return
|
|
}
|
|
|
|
entry.mu.Lock()
|
|
defer entry.mu.Unlock()
|
|
|
|
entry.services = nil
|
|
}
|
|
|
|
func (r *registry) expireAll() {
|
|
r.entries.Range(func(key any, value any) bool {
|
|
r.expire(key.(string))
|
|
return true
|
|
})
|
|
}
|
|
|
|
func (r *registry) registrationChannel() chan string {
|
|
return r.serviceChannel
|
|
}
|
|
|
|
type resolverConfig struct {
|
|
Client *consul.Config
|
|
QueryOptions *consul.QueryOptions
|
|
Registration *consul.AgentServiceRegistration
|
|
DeregisterOnClose bool
|
|
DaprPortMetaKey string
|
|
UseCache bool
|
|
}
|
|
|
|
// NewResolver creates Consul name resolver.
|
|
func NewResolver(logger logger.Logger) nr.Resolver {
|
|
return newResolver(logger, resolverConfig{}, &client{}, ®istry{serviceChannel: make(chan string, 100)}, make(chan struct{}))
|
|
}
|
|
|
|
func newResolver(logger logger.Logger, resolverConfig resolverConfig, client clientInterface, registry registryInterface, watcherStopChannel chan struct{}) nr.Resolver {
|
|
return &resolver{
|
|
logger: logger,
|
|
config: resolverConfig,
|
|
client: client,
|
|
registry: registry,
|
|
watcherStopChannel: watcherStopChannel,
|
|
}
|
|
}
|
|
|
|
// Init will configure component. It will also register service or validate client connection based on config.
|
|
func (r *resolver) Init(ctx context.Context, metadata nr.Metadata) (err error) {
|
|
r.config, err = getConfig(metadata)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if r.config.Client.TLSConfig.InsecureSkipVerify {
|
|
r.logger.Infof("hashicorp consul: you are using 'insecureSkipVerify' to skip server config verify which is unsafe!")
|
|
}
|
|
|
|
err = r.client.InitClient(r.config.Client)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to init consul client: %w", err)
|
|
}
|
|
|
|
// Register service to consul
|
|
if r.config.Registration != nil {
|
|
agent := r.client.Agent()
|
|
|
|
err = agent.ServiceRegister(r.config.Registration)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to register consul service: %w", err)
|
|
}
|
|
|
|
r.logger.Infof("service:%s registered on consul agent", r.config.Registration.Name)
|
|
} else {
|
|
_, err = r.client.Agent().Self()
|
|
if err != nil {
|
|
return fmt.Errorf("failed check on consul agent: %w", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ResolveID resolves name to address via consul.
|
|
func (r *resolver) ResolveID(ctx context.Context, req nr.ResolveRequest) (addr string, err error) {
|
|
cfg := r.config
|
|
svc, err := r.getService(req.ID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
port := svc.Service.Meta[cfg.DaprPortMetaKey]
|
|
if port == "" {
|
|
return "", fmt.Errorf("target service AppID '%s' found but %s missing from meta", req.ID, cfg.DaprPortMetaKey)
|
|
}
|
|
|
|
if svc.Service.Address != "" {
|
|
addr = svc.Service.Address
|
|
} else if svc.Node.Address != "" {
|
|
addr = svc.Node.Address
|
|
} else {
|
|
return "", fmt.Errorf("no healthy services found with AppID '%s'", req.ID)
|
|
}
|
|
|
|
return formatAddress(addr, port)
|
|
}
|
|
|
|
// Close will stop the watcher and deregister app from consul
|
|
func (r *resolver) Close() error {
|
|
if r.watcherStarted.Load() {
|
|
r.watcherStopChannel <- struct{}{}
|
|
}
|
|
|
|
if r.config.Registration != nil && r.config.DeregisterOnClose {
|
|
err := r.client.Agent().ServiceDeregister(r.config.Registration.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to deregister consul service: %w", err)
|
|
}
|
|
|
|
r.logger.Info("deregistered service from consul")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func formatAddress(address string, port string) (addr string, err error) {
|
|
if net.ParseIP(address).To4() != nil {
|
|
return address + ":" + port, nil
|
|
} else if net.ParseIP(address).To16() != nil {
|
|
return fmt.Sprintf("[%s]:%s", address, port), nil
|
|
}
|
|
|
|
// addr is not a valid IP address
|
|
// use net.JoinHostPort to format address if address is a valid hostname
|
|
if _, err := net.LookupHost(address); err == nil {
|
|
return net.JoinHostPort(address, port), nil
|
|
}
|
|
|
|
return "", fmt.Errorf("invalid ip address or unreachable hostname: %s", address)
|
|
}
|
|
|
|
// getConfig configuration from metadata, defaults are best suited for self-hosted mode.
|
|
func getConfig(metadata nr.Metadata) (resolverCfg resolverConfig, err error) {
|
|
props := metadata.GetPropertiesMap()
|
|
if props[nr.DaprPort] == "" {
|
|
return resolverCfg, fmt.Errorf("metadata property missing: %s", nr.DaprPort)
|
|
}
|
|
|
|
cfg, err := parseConfig(metadata.Configuration)
|
|
if err != nil {
|
|
return resolverCfg, err
|
|
}
|
|
|
|
resolverCfg.DaprPortMetaKey = cfg.DaprPortMetaKey
|
|
resolverCfg.DeregisterOnClose = cfg.SelfDeregister
|
|
resolverCfg.UseCache = cfg.UseCache
|
|
|
|
resolverCfg.Client = getClientConfig(cfg)
|
|
resolverCfg.Registration, err = getRegistrationConfig(cfg, props)
|
|
if err != nil {
|
|
return resolverCfg, err
|
|
}
|
|
resolverCfg.QueryOptions = getQueryOptionsConfig(cfg)
|
|
|
|
// if registering, set DaprPort in meta, needed for resolution
|
|
if resolverCfg.Registration != nil {
|
|
if resolverCfg.Registration.Meta == nil {
|
|
resolverCfg.Registration.Meta = map[string]string{}
|
|
}
|
|
|
|
resolverCfg.Registration.Meta[resolverCfg.DaprPortMetaKey] = props[nr.DaprPort]
|
|
}
|
|
|
|
return resolverCfg, nil
|
|
}
|
|
|
|
func getClientConfig(cfg configSpec) *consul.Config {
|
|
// If no client config use library defaults
|
|
if cfg.Client != nil {
|
|
return cfg.Client
|
|
}
|
|
|
|
return consul.DefaultConfig()
|
|
}
|
|
|
|
func getRegistrationConfig(cfg configSpec, props map[string]string) (*consul.AgentServiceRegistration, error) {
|
|
// if advanced registration configured ignore other registration related configs
|
|
if cfg.AdvancedRegistration != nil {
|
|
return cfg.AdvancedRegistration, nil
|
|
}
|
|
if !cfg.SelfRegister {
|
|
return nil, nil
|
|
}
|
|
|
|
var (
|
|
appID string
|
|
appPort string
|
|
host string
|
|
httpPort string
|
|
)
|
|
|
|
appID = props[nr.AppID]
|
|
if appID == "" {
|
|
return nil, fmt.Errorf("metadata property missing: %s", nr.AppID)
|
|
}
|
|
|
|
appPort = props[nr.AppPort]
|
|
if appPort == "" {
|
|
return nil, fmt.Errorf("metadata property missing: %s", nr.AppPort)
|
|
}
|
|
|
|
host = props[nr.HostAddress]
|
|
if host == "" {
|
|
return nil, fmt.Errorf("metadata property missing: %s", nr.HostAddress)
|
|
}
|
|
|
|
httpPort = props[nr.DaprHTTPPort]
|
|
if httpPort == "" {
|
|
return nil, fmt.Errorf("metadata property missing: %s", nr.DaprHTTPPort)
|
|
} else if _, err := strconv.ParseUint(httpPort, 10, 0); err != nil {
|
|
return nil, fmt.Errorf("error parsing %s: %w", nr.DaprHTTPPort, err)
|
|
}
|
|
|
|
id := appID + "-" + host + "-" + httpPort
|
|
// if no health checks configured add dapr sidecar health check by default
|
|
if len(cfg.Checks) == 0 {
|
|
cfg.Checks = []*consul.AgentServiceCheck{
|
|
{
|
|
Name: "Dapr Health Status",
|
|
CheckID: fmt.Sprintf("daprHealth:%s", id),
|
|
Interval: "15s",
|
|
HTTP: fmt.Sprintf("http://%s/v1.0/healthz?appid=%s", net.JoinHostPort(host, httpPort), appID),
|
|
},
|
|
}
|
|
}
|
|
|
|
appPortInt, err := strconv.Atoi(appPort)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing %s: %w", nr.AppPort, err)
|
|
}
|
|
|
|
return &consul.AgentServiceRegistration{
|
|
ID: id,
|
|
Name: appID,
|
|
Address: host,
|
|
Port: appPortInt,
|
|
Checks: cfg.Checks,
|
|
Tags: cfg.Tags,
|
|
Meta: cfg.Meta,
|
|
}, nil
|
|
}
|
|
|
|
func getQueryOptionsConfig(cfg configSpec) *consul.QueryOptions {
|
|
// if no query options configured add default filter matching every tag in config
|
|
if cfg.QueryOptions == nil {
|
|
return &consul.QueryOptions{
|
|
UseCache: true,
|
|
}
|
|
}
|
|
|
|
return cfg.QueryOptions
|
|
}
|