mirror of https://github.com/grpc/grpc-go.git
200 lines
6.3 KiB
Go
200 lines
6.3 KiB
Go
// +build go1.10
|
|
|
|
/*
|
|
*
|
|
* Copyright 2020 gRPC authors.
|
|
*
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
* you may not use this file except in compliance with the License.
|
|
* You may obtain a copy of the License at
|
|
*
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
|
*
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
* See the License for the specific language governing permissions and
|
|
* limitations under the License.
|
|
*
|
|
*/
|
|
|
|
package rls
|
|
|
|
import (
|
|
"sync"
|
|
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/balancer"
|
|
"google.golang.org/grpc/grpclog"
|
|
"google.golang.org/grpc/internal/grpcsync"
|
|
)
|
|
|
|
var (
|
|
_ balancer.Balancer = (*rlsBalancer)(nil)
|
|
|
|
// For overriding in tests.
|
|
newRLSClientFunc = newRLSClient
|
|
logger = grpclog.Component("rls")
|
|
)
|
|
|
|
// rlsBalancer implements the RLS LB policy.
|
|
type rlsBalancer struct {
|
|
done *grpcsync.Event
|
|
cc balancer.ClientConn
|
|
opts balancer.BuildOptions
|
|
|
|
// Mutex protects all the state maintained by the LB policy.
|
|
// TODO(easwars): Once we add the cache, we will also have another lock for
|
|
// the cache alone.
|
|
mu sync.Mutex
|
|
lbCfg *lbConfig // Most recently received service config.
|
|
rlsCC *grpc.ClientConn // ClientConn to the RLS server.
|
|
rlsC *rlsClient // RLS client wrapper.
|
|
|
|
ccUpdateCh chan *balancer.ClientConnState
|
|
}
|
|
|
|
// run is a long running goroutine which handles all the updates that the
|
|
// balancer wishes to handle. The appropriate updateHandler will push the update
|
|
// on to a channel that this goroutine will select on, thereby the handling of
|
|
// the update will happen asynchronously.
|
|
func (lb *rlsBalancer) run() {
|
|
for {
|
|
// TODO(easwars): Handle other updates like subConn state changes, RLS
|
|
// responses from the server etc.
|
|
select {
|
|
case u := <-lb.ccUpdateCh:
|
|
lb.handleClientConnUpdate(u)
|
|
case <-lb.done.Done():
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// handleClientConnUpdate handles updates to the service config.
|
|
// If the RLS server name or the RLS RPC timeout changes, it updates the control
|
|
// channel accordingly.
|
|
// TODO(easwars): Handle updates to other fields in the service config.
|
|
func (lb *rlsBalancer) handleClientConnUpdate(ccs *balancer.ClientConnState) {
|
|
logger.Infof("rls: service config: %+v", ccs.BalancerConfig)
|
|
lb.mu.Lock()
|
|
defer lb.mu.Unlock()
|
|
|
|
if lb.done.HasFired() {
|
|
logger.Warning("rls: received service config after balancer close")
|
|
return
|
|
}
|
|
|
|
newCfg := ccs.BalancerConfig.(*lbConfig)
|
|
if lb.lbCfg.Equal(newCfg) {
|
|
logger.Info("rls: new service config matches existing config")
|
|
return
|
|
}
|
|
|
|
lb.updateControlChannel(newCfg)
|
|
lb.lbCfg = newCfg
|
|
}
|
|
|
|
// UpdateClientConnState pushes the received ClientConnState update on the
|
|
// update channel which will be processed asynchronously by the run goroutine.
|
|
// Implements balancer.Balancer interface.
|
|
func (lb *rlsBalancer) UpdateClientConnState(ccs balancer.ClientConnState) error {
|
|
select {
|
|
case lb.ccUpdateCh <- &ccs:
|
|
case <-lb.done.Done():
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ResolverErr implements balancer.Balancer interface.
|
|
func (lb *rlsBalancer) ResolverError(error) {
|
|
// ResolverError is called by gRPC when the name resolver reports an error.
|
|
// TODO(easwars): How do we handle this?
|
|
logger.Fatal("rls: ResolverError is not yet unimplemented")
|
|
}
|
|
|
|
// UpdateSubConnState implements balancer.Balancer interface.
|
|
func (lb *rlsBalancer) UpdateSubConnState(_ balancer.SubConn, _ balancer.SubConnState) {
|
|
logger.Fatal("rls: UpdateSubConnState is not yet implemented")
|
|
}
|
|
|
|
// Cleans up the resources allocated by the LB policy including the clientConn
|
|
// to the RLS server.
|
|
// Implements balancer.Balancer.
|
|
func (lb *rlsBalancer) Close() {
|
|
lb.mu.Lock()
|
|
defer lb.mu.Unlock()
|
|
|
|
lb.done.Fire()
|
|
if lb.rlsCC != nil {
|
|
lb.rlsCC.Close()
|
|
}
|
|
}
|
|
|
|
// updateControlChannel updates the RLS client if required.
|
|
// Caller must hold lb.mu.
|
|
func (lb *rlsBalancer) updateControlChannel(newCfg *lbConfig) {
|
|
oldCfg := lb.lbCfg
|
|
if newCfg.lookupService == oldCfg.lookupService && newCfg.lookupServiceTimeout == oldCfg.lookupServiceTimeout {
|
|
return
|
|
}
|
|
|
|
// Use RPC timeout from new config, if different from existing one.
|
|
timeout := oldCfg.lookupServiceTimeout
|
|
if timeout != newCfg.lookupServiceTimeout {
|
|
timeout = newCfg.lookupServiceTimeout
|
|
}
|
|
|
|
if newCfg.lookupService == oldCfg.lookupService {
|
|
// This is the case where only the timeout has changed. We will continue
|
|
// to use the existing clientConn. but will create a new rlsClient with
|
|
// the new timeout.
|
|
lb.rlsC = newRLSClientFunc(lb.rlsCC, lb.opts.Target.Endpoint, timeout)
|
|
return
|
|
}
|
|
|
|
// This is the case where the RLS server name has changed. We need to create
|
|
// a new clientConn and close the old one.
|
|
var dopts []grpc.DialOption
|
|
if dialer := lb.opts.Dialer; dialer != nil {
|
|
dopts = append(dopts, grpc.WithContextDialer(dialer))
|
|
}
|
|
dopts = append(dopts, dialCreds(lb.opts))
|
|
|
|
cc, err := grpc.Dial(newCfg.lookupService, dopts...)
|
|
if err != nil {
|
|
logger.Errorf("rls: dialRLS(%s, %v): %v", newCfg.lookupService, lb.opts, err)
|
|
// An error from a non-blocking dial indicates something serious. We
|
|
// should continue to use the old control channel if one exists, and
|
|
// return so that the rest of the config updates can be processes.
|
|
return
|
|
}
|
|
if lb.rlsCC != nil {
|
|
lb.rlsCC.Close()
|
|
}
|
|
lb.rlsCC = cc
|
|
lb.rlsC = newRLSClientFunc(cc, lb.opts.Target.Endpoint, timeout)
|
|
}
|
|
|
|
func dialCreds(opts balancer.BuildOptions) grpc.DialOption {
|
|
// The control channel should use the same authority as that of the parent
|
|
// channel. This ensures that the identify of the RLS server and that of the
|
|
// backend is the same, so if the RLS config is injected by an attacker, it
|
|
// cannot cause leakage of private information contained in headers set by
|
|
// the application.
|
|
server := opts.Target.Authority
|
|
switch {
|
|
case opts.DialCreds != nil:
|
|
if err := opts.DialCreds.OverrideServerName(server); err != nil {
|
|
logger.Warningf("rls: OverrideServerName(%s) = (%v), using Insecure", server, err)
|
|
return grpc.WithInsecure()
|
|
}
|
|
return grpc.WithTransportCredentials(opts.DialCreds)
|
|
case opts.CredsBundle != nil:
|
|
return grpc.WithTransportCredentials(opts.CredsBundle.TransportCredentials())
|
|
default:
|
|
logger.Warning("rls: no credentials available, using Insecure")
|
|
return grpc.WithInsecure()
|
|
}
|
|
}
|