grpc-go/xds/internal/xdsclient/pubsub/update.go

319 lines
11 KiB
Go

/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package pubsub
import (
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/xds/internal/xdsclient/xdsresource"
"google.golang.org/protobuf/proto"
)
type watcherInfoWithUpdate struct {
wi *watchInfo
update interface{}
err error
}
// scheduleCallback should only be called by methods of watchInfo, which checks
// for watcher states and maintain consistency.
func (pb *Pubsub) scheduleCallback(wi *watchInfo, update interface{}, err error) {
pb.updateCh.Put(&watcherInfoWithUpdate{
wi: wi,
update: update,
err: err,
})
}
func (pb *Pubsub) callCallback(wiu *watcherInfoWithUpdate) {
pb.mu.Lock()
// Use a closure to capture the callback and type assertion, to save one
// more switch case.
//
// The callback must be called without pb.mu. Otherwise if the callback calls
// another watch() inline, it will cause a deadlock. This leaves a small
// window that a watcher's callback could be called after the watcher is
// canceled, and the user needs to take care of it.
var ccb func()
switch wiu.wi.rType {
case xdsresource.ListenerResource:
if s, ok := pb.ldsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.ldsCallback(wiu.update.(xdsresource.ListenerUpdate), wiu.err) }
}
case xdsresource.RouteConfigResource:
if s, ok := pb.rdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.rdsCallback(wiu.update.(xdsresource.RouteConfigUpdate), wiu.err) }
}
case xdsresource.ClusterResource:
if s, ok := pb.cdsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.cdsCallback(wiu.update.(xdsresource.ClusterUpdate), wiu.err) }
}
case xdsresource.EndpointsResource:
if s, ok := pb.edsWatchers[wiu.wi.target]; ok && s[wiu.wi] {
ccb = func() { wiu.wi.edsCallback(wiu.update.(xdsresource.EndpointsUpdate), wiu.err) }
}
}
pb.mu.Unlock()
if ccb != nil {
ccb()
}
}
// NewListeners is called when there's a new LDS update.
func (pb *Pubsub) NewListeners(updates map[string]xdsresource.ListenerUpdateErrTuple, metadata xdsresource.UpdateMetadata) {
pb.mu.Lock()
defer pb.mu.Unlock()
for name, uErr := range updates {
if s, ok := pb.ldsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := pb.ldsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
pb.ldsMD[name] = mdCopy
for wi := range s {
wi.newError(uErr.Err)
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := pb.ldsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
pb.logger.Debugf("LDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
pb.ldsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = xdsresource.ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
pb.ldsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
// them.
for name := range pb.ldsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete
// the resource from cache, and also send an resource not found
// error to indicate resource removed.
delete(pb.ldsCache, name)
pb.ldsMD[name] = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for wi := range pb.ldsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When LDS resource is removed, we don't delete corresponding RDS cached
// data. The RDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}
// NewRouteConfigs is called when there's a new RDS update.
func (pb *Pubsub) NewRouteConfigs(updates map[string]xdsresource.RouteConfigUpdateErrTuple, metadata xdsresource.UpdateMetadata) {
pb.mu.Lock()
defer pb.mu.Unlock()
// If no error received, the status is ACK.
for name, uErr := range updates {
if s, ok := pb.rdsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := pb.rdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
pb.rdsMD[name] = mdCopy
for wi := range s {
wi.newError(uErr.Err)
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := pb.rdsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
pb.logger.Debugf("RDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
pb.rdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = xdsresource.ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
pb.rdsMD[name] = mdCopy
}
}
}
// NewClusters is called when there's a new CDS update.
func (pb *Pubsub) NewClusters(updates map[string]xdsresource.ClusterUpdateErrTuple, metadata xdsresource.UpdateMetadata) {
pb.mu.Lock()
defer pb.mu.Unlock()
for name, uErr := range updates {
if s, ok := pb.cdsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := pb.cdsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
pb.cdsMD[name] = mdCopy
for wi := range s {
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := pb.cdsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
pb.logger.Debugf("CDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
pb.cdsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = xdsresource.ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
pb.cdsMD[name] = mdCopy
}
}
// Resources not in the new update were removed by the server, so delete
// them.
for name := range pb.cdsCache {
if _, ok := updates[name]; !ok {
// If resource exists in cache, but not in the new update, delete it
// from cache, and also send an resource not found error to indicate
// resource removed.
delete(pb.cdsCache, name)
pb.cdsMD[name] = xdsresource.UpdateMetadata{Status: xdsresource.ServiceStatusNotExist}
for wi := range pb.cdsWatchers[name] {
wi.resourceNotFound()
}
}
}
// When CDS resource is removed, we don't delete corresponding EDS cached
// data. The EDS watch will be canceled, and cache entry is removed when the
// last watch is canceled.
}
// NewEndpoints is called when there's anew EDS update.
func (pb *Pubsub) NewEndpoints(updates map[string]xdsresource.EndpointsUpdateErrTuple, metadata xdsresource.UpdateMetadata) {
pb.mu.Lock()
defer pb.mu.Unlock()
for name, uErr := range updates {
if s, ok := pb.edsWatchers[name]; ok {
if uErr.Err != nil {
// On error, keep previous version for each resource. But update
// status and error.
mdCopy := pb.edsMD[name]
mdCopy.ErrState = metadata.ErrState
mdCopy.Status = metadata.Status
pb.edsMD[name] = mdCopy
for wi := range s {
// Send the watcher the individual error, instead of the
// overall combined error from the metadata.ErrState.
wi.newError(uErr.Err)
}
continue
}
// If we get here, it means that the update is a valid one. Notify
// watchers only if this is a first time update or it is different
// from the one currently cached.
if cur, ok := pb.edsCache[name]; !ok || !proto.Equal(cur.Raw, uErr.Update.Raw) {
for wi := range s {
wi.newUpdate(uErr.Update)
}
}
// Sync cache.
pb.logger.Debugf("EDS resource with name %v, value %+v added to cache", name, pretty.ToJSON(uErr))
pb.edsCache[name] = uErr.Update
// Set status to ACK, and clear error state. The metadata might be a
// NACK metadata because some other resources in the same response
// are invalid.
mdCopy := metadata
mdCopy.Status = xdsresource.ServiceStatusACKed
mdCopy.ErrState = nil
if metadata.ErrState != nil {
mdCopy.Version = metadata.ErrState.Version
}
pb.edsMD[name] = mdCopy
}
}
}
// NewConnectionError is called by the underlying xdsAPIClient when it receives
// a connection error. The error will be forwarded to all the resource watchers.
func (pb *Pubsub) NewConnectionError(err error) {
pb.mu.Lock()
defer pb.mu.Unlock()
for _, s := range pb.ldsWatchers {
for wi := range s {
wi.newError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
}
}
for _, s := range pb.rdsWatchers {
for wi := range s {
wi.newError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
}
}
for _, s := range pb.cdsWatchers {
for wi := range s {
wi.newError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
}
}
for _, s := range pb.edsWatchers {
for wi := range s {
wi.newError(xdsresource.NewErrorf(xdsresource.ErrorTypeConnection, "xds: error received from xDS stream: %v", err))
}
}
}