mirror of https://github.com/grpc/grpc-go.git
494 lines
17 KiB
Go
494 lines
17 KiB
Go
/*
|
|
*
|
|
* Copyright 2019 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
// Package client implementation a full fledged gRPC client for the xDS API
|
|
// used by the xds resolver and balancer implementations.
|
|
package client
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
v2corepb "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
|
|
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
|
|
"github.com/golang/protobuf/proto"
|
|
|
|
"google.golang.org/grpc/xds/internal/client/load"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/internal/backoff"
|
|
"google.golang.org/grpc/internal/buffer"
|
|
"google.golang.org/grpc/internal/grpclog"
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
"google.golang.org/grpc/keepalive"
|
|
"google.golang.org/grpc/xds/internal"
|
|
"google.golang.org/grpc/xds/internal/client/bootstrap"
|
|
"google.golang.org/grpc/xds/internal/version"
|
|
)
|
|
|
|
var (
|
|
m = make(map[version.TransportAPI]APIClientBuilder)
|
|
)
|
|
|
|
// RegisterAPIClientBuilder registers a client builder for xDS transport protocol
|
|
// version specified by b.Version().
|
|
//
|
|
// NOTE: this function must only be called during initialization time (i.e. in
|
|
// an init() function), and is not thread-safe. If multiple builders are
|
|
// registered for the same version, the one registered last will take effect.
|
|
func RegisterAPIClientBuilder(b APIClientBuilder) {
|
|
m[b.Version()] = b
|
|
}
|
|
|
|
// getAPIClientBuilder returns the client builder registered for the provided
|
|
// xDS transport API version.
|
|
func getAPIClientBuilder(version version.TransportAPI) APIClientBuilder {
|
|
if b, ok := m[version]; ok {
|
|
return b
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// BuildOptions contains options to be passed to client builders.
|
|
type BuildOptions struct {
|
|
// Parent is a top-level xDS client or server which has the intelligence to
|
|
// take appropriate action based on xDS responses received from the
|
|
// management server.
|
|
Parent UpdateHandler
|
|
// NodeProto contains the Node proto to be used in xDS requests. The actual
|
|
// type depends on the transport protocol version used.
|
|
NodeProto proto.Message
|
|
// Backoff returns the amount of time to backoff before retrying broken
|
|
// streams.
|
|
Backoff func(int) time.Duration
|
|
// Logger provides enhanced logging capabilities.
|
|
Logger *grpclog.PrefixLogger
|
|
}
|
|
|
|
// APIClientBuilder creates an xDS client for a specific xDS transport protocol
|
|
// version.
|
|
type APIClientBuilder interface {
|
|
// Build builds a transport protocol specific implementation of the xDS
|
|
// client based on the provided clientConn to the management server and the
|
|
// provided options.
|
|
Build(*grpc.ClientConn, BuildOptions) (APIClient, error)
|
|
// Version returns the xDS transport protocol version used by clients build
|
|
// using this builder.
|
|
Version() version.TransportAPI
|
|
}
|
|
|
|
// APIClient represents the functionality provided by transport protocol
|
|
// version specific implementations of the xDS client.
|
|
//
|
|
// TODO: unexport this interface and all the methods after the PR to make
|
|
// xdsClient sharable by clients. AddWatch and RemoveWatch are exported for
|
|
// v2/v3 to override because they need to keep track of LDS name for RDS to use.
|
|
// After the share xdsClient change, that's no longer necessary. After that, we
|
|
// will still keep this interface for testing purposes.
|
|
type APIClient interface {
|
|
// AddWatch adds a watch for an xDS resource given its type and name.
|
|
AddWatch(ResourceType, string)
|
|
|
|
// RemoveWatch cancels an already registered watch for an xDS resource
|
|
// given its type and name.
|
|
RemoveWatch(ResourceType, string)
|
|
|
|
// reportLoad starts an LRS stream to periodically report load using the
|
|
// provided ClientConn, which represent a connection to the management
|
|
// server.
|
|
reportLoad(ctx context.Context, cc *grpc.ClientConn, opts loadReportingOptions)
|
|
|
|
// Close cleans up resources allocated by the API client.
|
|
Close()
|
|
}
|
|
|
|
// loadReportingOptions contains configuration knobs for reporting load data.
|
|
type loadReportingOptions struct {
|
|
loadStore *load.Store
|
|
}
|
|
|
|
// UpdateHandler receives and processes (by taking appropriate actions) xDS
|
|
// resource updates from an APIClient for a specific version.
|
|
type UpdateHandler interface {
|
|
// NewListeners handles updates to xDS listener resources.
|
|
NewListeners(map[string]ListenerUpdate)
|
|
// NewRouteConfigs handles updates to xDS RouteConfiguration resources.
|
|
NewRouteConfigs(map[string]RouteConfigUpdate)
|
|
// NewClusters handles updates to xDS Cluster resources.
|
|
NewClusters(map[string]ClusterUpdate)
|
|
// NewEndpoints handles updates to xDS ClusterLoadAssignment (or tersely
|
|
// referred to as Endpoints) resources.
|
|
NewEndpoints(map[string]EndpointsUpdate)
|
|
}
|
|
|
|
// ListenerUpdate contains information received in an LDS response, which is of
|
|
// interest to the registered LDS watcher.
|
|
type ListenerUpdate struct {
|
|
// RouteConfigName is the route configuration name corresponding to the
|
|
// target which is being watched through LDS.
|
|
RouteConfigName string
|
|
}
|
|
|
|
// RouteConfigUpdate contains information received in an RDS response, which is
|
|
// of interest to the registered RDS watcher.
|
|
type RouteConfigUpdate struct {
|
|
VirtualHosts []*VirtualHost
|
|
}
|
|
|
|
// VirtualHost contains the routes for a list of Domains.
|
|
//
|
|
// Note that the domains in this slice can be a wildcard, not an exact string.
|
|
// The consumer of this struct needs to find the best match for its hostname.
|
|
type VirtualHost struct {
|
|
Domains []string
|
|
// Routes contains a list of routes, each containing matchers and
|
|
// corresponding action.
|
|
Routes []*Route
|
|
}
|
|
|
|
// Route is both a specification of how to match a request as well as an
|
|
// indication of the action to take upon match.
|
|
type Route struct {
|
|
Path, Prefix, Regex *string
|
|
// Indicates if prefix/path matching should be case insensitive. The default
|
|
// is false (case sensitive).
|
|
CaseInsensitive bool
|
|
Headers []*HeaderMatcher
|
|
Fraction *uint32
|
|
Action map[string]uint32 // action is weighted clusters.
|
|
}
|
|
|
|
// HeaderMatcher represents header matchers.
|
|
type HeaderMatcher struct {
|
|
Name string `json:"name"`
|
|
InvertMatch *bool `json:"invertMatch,omitempty"`
|
|
ExactMatch *string `json:"exactMatch,omitempty"`
|
|
RegexMatch *string `json:"regexMatch,omitempty"`
|
|
PrefixMatch *string `json:"prefixMatch,omitempty"`
|
|
SuffixMatch *string `json:"suffixMatch,omitempty"`
|
|
RangeMatch *Int64Range `json:"rangeMatch,omitempty"`
|
|
PresentMatch *bool `json:"presentMatch,omitempty"`
|
|
}
|
|
|
|
// Int64Range is a range for header range match.
|
|
type Int64Range struct {
|
|
Start int64 `json:"start"`
|
|
End int64 `json:"end"`
|
|
}
|
|
|
|
// SecurityConfig contains the security configuration received as part of the
|
|
// Cluster resource.
|
|
type SecurityConfig struct {
|
|
// RootInstanceName identifies the certProvider plugin to be used to fetch
|
|
// root certificates. This instance name will be resolved to the plugin name
|
|
// and its associated configuration from the certificate_providers field of
|
|
// the bootstrap file.
|
|
RootInstanceName string
|
|
// RootCertName is the certificate name to be passed to the plugin (looked
|
|
// up from the bootstrap file) while fetching root certificates.
|
|
RootCertName string
|
|
// IdentityInstanceName identifies the certProvider plugin to be used to
|
|
// fetch identity certificates. This instance name will be resolved to the
|
|
// plugin name and its associated configuration from the
|
|
// certificate_providers field of the bootstrap file.
|
|
IdentityInstanceName string
|
|
// IdentityCertName is the certificate name to be passed to the plugin
|
|
// (looked up from the bootstrap file) while fetching identity certificates.
|
|
IdentityCertName string
|
|
// AcceptedSANs is a list of Subject Alternative Names. During the TLS
|
|
// handshake, the SAN present in the peer certificate is compared against
|
|
// this list, and the handshake succeeds only if a match is found.
|
|
AcceptedSANs []string
|
|
}
|
|
|
|
// ClusterUpdate contains information from a received CDS response, which is of
|
|
// interest to the registered CDS watcher.
|
|
type ClusterUpdate struct {
|
|
// ServiceName is the service name corresponding to the clusterName which
|
|
// is being watched for through CDS.
|
|
ServiceName string
|
|
// EnableLRS indicates whether or not load should be reported through LRS.
|
|
EnableLRS bool
|
|
// SecurityCfg contains security configuration sent by the xDS server.
|
|
SecurityCfg *SecurityConfig
|
|
}
|
|
|
|
// OverloadDropConfig contains the config to drop overloads.
|
|
type OverloadDropConfig struct {
|
|
Category string
|
|
Numerator uint32
|
|
Denominator uint32
|
|
}
|
|
|
|
// EndpointHealthStatus represents the health status of an endpoint.
|
|
type EndpointHealthStatus int32
|
|
|
|
const (
|
|
// EndpointHealthStatusUnknown represents HealthStatus UNKNOWN.
|
|
EndpointHealthStatusUnknown EndpointHealthStatus = iota
|
|
// EndpointHealthStatusHealthy represents HealthStatus HEALTHY.
|
|
EndpointHealthStatusHealthy
|
|
// EndpointHealthStatusUnhealthy represents HealthStatus UNHEALTHY.
|
|
EndpointHealthStatusUnhealthy
|
|
// EndpointHealthStatusDraining represents HealthStatus DRAINING.
|
|
EndpointHealthStatusDraining
|
|
// EndpointHealthStatusTimeout represents HealthStatus TIMEOUT.
|
|
EndpointHealthStatusTimeout
|
|
// EndpointHealthStatusDegraded represents HealthStatus DEGRADED.
|
|
EndpointHealthStatusDegraded
|
|
)
|
|
|
|
// Endpoint contains information of an endpoint.
|
|
type Endpoint struct {
|
|
Address string
|
|
HealthStatus EndpointHealthStatus
|
|
Weight uint32
|
|
}
|
|
|
|
// Locality contains information of a locality.
|
|
type Locality struct {
|
|
Endpoints []Endpoint
|
|
ID internal.LocalityID
|
|
Priority uint32
|
|
Weight uint32
|
|
}
|
|
|
|
// EndpointsUpdate contains an EDS update.
|
|
type EndpointsUpdate struct {
|
|
Drops []OverloadDropConfig
|
|
Localities []Locality
|
|
}
|
|
|
|
// Function to be overridden in tests.
|
|
var newAPIClient = func(apiVersion version.TransportAPI, cc *grpc.ClientConn, opts BuildOptions) (APIClient, error) {
|
|
cb := getAPIClientBuilder(apiVersion)
|
|
if cb == nil {
|
|
return nil, fmt.Errorf("no client builder for xDS API version: %v", apiVersion)
|
|
}
|
|
return cb.Build(cc, opts)
|
|
}
|
|
|
|
// clientImpl is the real implementation of the xds client. The exported Client
|
|
// is a wrapper of this struct with a ref count.
|
|
//
|
|
// Implements UpdateHandler interface.
|
|
// TODO(easwars): Make a wrapper struct which implements this interface in the
|
|
// style of ccBalancerWrapper so that the Client type does not implement these
|
|
// exported methods.
|
|
type clientImpl struct {
|
|
done *grpcsync.Event
|
|
config *bootstrap.Config
|
|
cc *grpc.ClientConn // Connection to the management server.
|
|
apiClient APIClient
|
|
watchExpiryTimeout time.Duration
|
|
|
|
logger *grpclog.PrefixLogger
|
|
|
|
updateCh *buffer.Unbounded // chan *watcherInfoWithUpdate
|
|
mu sync.Mutex
|
|
ldsWatchers map[string]map[*watchInfo]bool
|
|
ldsCache map[string]ListenerUpdate
|
|
rdsWatchers map[string]map[*watchInfo]bool
|
|
rdsCache map[string]RouteConfigUpdate
|
|
cdsWatchers map[string]map[*watchInfo]bool
|
|
cdsCache map[string]ClusterUpdate
|
|
edsWatchers map[string]map[*watchInfo]bool
|
|
edsCache map[string]EndpointsUpdate
|
|
|
|
// Changes to map lrsClients and the lrsClient inside the map need to be
|
|
// protected by lrsMu.
|
|
lrsMu sync.Mutex
|
|
lrsClients map[string]*lrsClient
|
|
}
|
|
|
|
// newWithConfig returns a new xdsClient with the given config.
|
|
func newWithConfig(config *bootstrap.Config, watchExpiryTimeout time.Duration) (*clientImpl, error) {
|
|
switch {
|
|
case config.BalancerName == "":
|
|
return nil, errors.New("xds: no xds_server name provided in options")
|
|
case config.Creds == nil:
|
|
return nil, errors.New("xds: no credentials provided in options")
|
|
case config.NodeProto == nil:
|
|
return nil, errors.New("xds: no node_proto provided in options")
|
|
}
|
|
|
|
switch config.TransportAPI {
|
|
case version.TransportV2:
|
|
if _, ok := config.NodeProto.(*v2corepb.Node); !ok {
|
|
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
|
|
}
|
|
case version.TransportV3:
|
|
if _, ok := config.NodeProto.(*v3corepb.Node); !ok {
|
|
return nil, fmt.Errorf("xds: Node proto type (%T) does not match API version: %v", config.NodeProto, config.TransportAPI)
|
|
}
|
|
}
|
|
|
|
dopts := []grpc.DialOption{
|
|
config.Creds,
|
|
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: 5 * time.Minute,
|
|
Timeout: 20 * time.Second,
|
|
}),
|
|
}
|
|
|
|
c := &clientImpl{
|
|
done: grpcsync.NewEvent(),
|
|
config: config,
|
|
watchExpiryTimeout: watchExpiryTimeout,
|
|
|
|
updateCh: buffer.NewUnbounded(),
|
|
ldsWatchers: make(map[string]map[*watchInfo]bool),
|
|
ldsCache: make(map[string]ListenerUpdate),
|
|
rdsWatchers: make(map[string]map[*watchInfo]bool),
|
|
rdsCache: make(map[string]RouteConfigUpdate),
|
|
cdsWatchers: make(map[string]map[*watchInfo]bool),
|
|
cdsCache: make(map[string]ClusterUpdate),
|
|
edsWatchers: make(map[string]map[*watchInfo]bool),
|
|
edsCache: make(map[string]EndpointsUpdate),
|
|
lrsClients: make(map[string]*lrsClient),
|
|
}
|
|
|
|
cc, err := grpc.Dial(config.BalancerName, dopts...)
|
|
if err != nil {
|
|
// An error from a non-blocking dial indicates something serious.
|
|
return nil, fmt.Errorf("xds: failed to dial balancer {%s}: %v", config.BalancerName, err)
|
|
}
|
|
c.cc = cc
|
|
c.logger = prefixLogger((c))
|
|
c.logger.Infof("Created ClientConn to xDS management server: %s", config.BalancerName)
|
|
|
|
apiClient, err := newAPIClient(config.TransportAPI, cc, BuildOptions{
|
|
Parent: c,
|
|
NodeProto: config.NodeProto,
|
|
Backoff: backoff.DefaultExponential.Backoff,
|
|
Logger: c.logger,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
c.apiClient = apiClient
|
|
c.logger.Infof("Created")
|
|
go c.run()
|
|
return c, nil
|
|
}
|
|
|
|
// BootstrapConfig returns the configuration read from the bootstrap file.
|
|
// Callers must treat the return value as read-only.
|
|
func (c *Client) BootstrapConfig() *bootstrap.Config {
|
|
return c.config
|
|
}
|
|
|
|
// run is a goroutine for all the callbacks.
|
|
//
|
|
// Callback can be called in watch(), if an item is found in cache. Without this
|
|
// goroutine, the callback will be called inline, which might cause a deadlock
|
|
// in user's code. Callbacks also cannot be simple `go callback()` because the
|
|
// order matters.
|
|
func (c *clientImpl) run() {
|
|
for {
|
|
select {
|
|
case t := <-c.updateCh.Get():
|
|
c.updateCh.Load()
|
|
if c.done.HasFired() {
|
|
return
|
|
}
|
|
c.callCallback(t.(*watcherInfoWithUpdate))
|
|
case <-c.done.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// Close closes the gRPC connection to the management server.
|
|
func (c *clientImpl) Close() {
|
|
if c.done.HasFired() {
|
|
return
|
|
}
|
|
c.done.Fire()
|
|
// TODO: Should we invoke the registered callbacks here with an error that
|
|
// the client is closed?
|
|
c.apiClient.Close()
|
|
c.cc.Close()
|
|
c.logger.Infof("Shutdown")
|
|
}
|
|
|
|
// ResourceType identifies resources in a transport protocol agnostic way. These
|
|
// will be used in transport version agnostic code, while the versioned API
|
|
// clients will map these to appropriate version URLs.
|
|
type ResourceType int
|
|
|
|
// Version agnostic resource type constants.
|
|
const (
|
|
UnknownResource ResourceType = iota
|
|
ListenerResource
|
|
HTTPConnManagerResource
|
|
RouteConfigResource
|
|
ClusterResource
|
|
EndpointsResource
|
|
)
|
|
|
|
func (r ResourceType) String() string {
|
|
switch r {
|
|
case ListenerResource:
|
|
return "ListenerResource"
|
|
case HTTPConnManagerResource:
|
|
return "HTTPConnManagerResource"
|
|
case RouteConfigResource:
|
|
return "RouteConfigResource"
|
|
case ClusterResource:
|
|
return "ClusterResource"
|
|
case EndpointsResource:
|
|
return "EndpointsResource"
|
|
default:
|
|
return "UnknownResource"
|
|
}
|
|
}
|
|
|
|
// IsListenerResource returns true if the provider URL corresponds to an xDS
|
|
// Listener resource.
|
|
func IsListenerResource(url string) bool {
|
|
return url == version.V2ListenerURL || url == version.V3ListenerURL
|
|
}
|
|
|
|
// IsHTTPConnManagerResource returns true if the provider URL corresponds to an xDS
|
|
// HTTPConnManager resource.
|
|
func IsHTTPConnManagerResource(url string) bool {
|
|
return url == version.V2HTTPConnManagerURL || url == version.V3HTTPConnManagerURL
|
|
}
|
|
|
|
// IsRouteConfigResource returns true if the provider URL corresponds to an xDS
|
|
// RouteConfig resource.
|
|
func IsRouteConfigResource(url string) bool {
|
|
return url == version.V2RouteConfigURL || url == version.V3RouteConfigURL
|
|
}
|
|
|
|
// IsClusterResource returns true if the provider URL corresponds to an xDS
|
|
// Cluster resource.
|
|
func IsClusterResource(url string) bool {
|
|
return url == version.V2ClusterURL || url == version.V3ClusterURL
|
|
}
|
|
|
|
// IsEndpointsResource returns true if the provider URL corresponds to an xDS
|
|
// Endpoints resource.
|
|
func IsEndpointsResource(url string) bool {
|
|
return url == version.V2EndpointsURL || url == version.V3EndpointsURL
|
|
}
|