deps: bump go-control-plane with envoy 1.28 support

Updates go-control-plane to the latest version sync'd and tested against
Envoy 1.28.

Signed-off-by: Lance Austin <laustin@dataiwre.io>
This commit is contained in:
Lance Austin 2023-11-16 19:46:22 +00:00 committed by Alice Wasko
parent 2d36cf21e0
commit 9292c47470
10 changed files with 175 additions and 117 deletions

View File

@ -35,7 +35,7 @@ ENVOY_DOCKER_TAG ?= $(ENVOY_DOCKER_REPO):envoy-$(ENVOY_DOCKER_VERSION)
# which commits are ancestors, I added `make guess-envoy-go-control-plane-commit` to do that in an
# automated way! Still look at the commit yourself to make sure it seems sane; blindly trusting
# machines is bad, mmkay?
ENVOY_GO_CONTROL_PLANE_COMMIT = b501c94cb61e3235b9156629377fba229d9571d8
ENVOY_GO_CONTROL_PLANE_COMMIT = 6e4589f570e19a3d17087cf80d40bacdc6356de6
# Set ENVOY_DOCKER_REPO to the list of mirrors to check
ENVOY_DOCKER_REPOS = docker.io/emissaryingress/base-envoy

View File

@ -22,14 +22,11 @@ import (
"google.golang.org/protobuf/proto"
cluster "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/cluster/v3"
core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
endpoint "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/endpoint/v3"
listener "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/listener/v3"
route "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/route/v3"
auth "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/extensions/transport_sockets/tls/v3"
runtime "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/runtime/v3"
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
ratelimit "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/ratelimit/config/ratelimit/v3"
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
)
@ -93,24 +90,6 @@ func GetResourceName(res types.Resource) string {
switch v := res.(type) {
case *endpoint.ClusterLoadAssignment:
return v.GetClusterName()
case *cluster.Cluster:
return v.GetName()
case *route.RouteConfiguration:
return v.GetName()
case *route.ScopedRouteConfiguration:
return v.GetName()
case *route.VirtualHost:
return v.GetName()
case *listener.Listener:
return v.GetName()
case *auth.Secret:
return v.GetName()
case *runtime.Runtime:
return v.GetName()
case *core.TypedExtensionConfig:
return v.GetName()
case *ratelimit.RateLimitConfig:
return v.GetName()
case types.ResourceWithName:
return v.GetName()
default:

View File

@ -233,54 +233,95 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
info.mu.Lock()
defer info.mu.Unlock()
// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}
// discard the watch
delete(info.watches, id)
}
return nil
// Respond to SOTW watches for the node.
if err := cache.respondSOTWWatches(ctx, info, snapshot); err != nil {
return err
}
// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
}
// Respond to delta watches for the node.
return cache.respondDeltaWatches(ctx, info, snapshot)
}
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) > 0 {
err := snapshot.ConstructVersionMap()
return nil
}
func (cache *snapshotCache) respondSOTWWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
// responder callback for SOTW watches
respond := func(watch ResponseWatch, id int64) error {
version := snapshot.GetVersion(watch.Request.TypeUrl)
if version != watch.Request.VersionInfo {
cache.log.Debugf("respond open watch %d %s%v with new version %q", id, watch.Request.TypeUrl, watch.Request.ResourceNames, version)
resources := snapshot.GetResourcesAndTTL(watch.Request.TypeUrl)
err := cache.respond(ctx, watch.Request, watch.Response, resources, version, false)
if err != nil {
return err
}
// discard the watch
delete(info.watches, id)
}
return nil
}
// If ADS is enabled we need to order response watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseWatches()
for _, key := range info.orderedWatches {
err := respond(info.watches[key.ID], key.ID)
if err != nil {
return err
}
}
} else {
for id, watch := range info.watches {
err := respond(watch, id)
if err != nil {
return err
}
}
}
// this won't run if there are no delta watches
// to process.
return nil
}
func (cache *snapshotCache) respondDeltaWatches(ctx context.Context, info *statusInfo, snapshot ResourceSnapshot) error {
// We only calculate version hashes when using delta. We don't
// want to do this when using SOTW so we can avoid unnecessary
// computational cost if not using delta.
if len(info.deltaWatches) == 0 {
return nil
}
err := snapshot.ConstructVersionMap()
if err != nil {
return err
}
// If ADS is enabled we need to order response delta watches so we guarantee
// sending them in the correct order. Go's default implementation
// of maps are randomized order when ranged over.
if cache.ads {
info.orderResponseDeltaWatches()
for _, key := range info.orderedDeltaWatches {
watch := info.deltaWatches[key.ID]
res, err := cache.respondDelta(
ctx,
snapshot,
watch.Request,
watch.Response,
watch.StreamState,
)
if err != nil {
return err
}
// If we detect a nil response here, that means there has been no state change
// so we don't want to respond or remove any existing resource watches
if res != nil {
delete(info.deltaWatches, key.ID)
}
}
} else {
for id, watch := range info.deltaWatches {
res, err := cache.respondDelta(
ctx,
@ -299,7 +340,6 @@ func (cache *snapshotCache) SetSnapshot(ctx context.Context, node string, snapsh
}
}
}
return nil
}

View File

@ -70,7 +70,8 @@ type statusInfo struct {
orderedWatches keys
// deltaWatches are indexed channels for the delta response watches and the original requests
deltaWatches map[int64]DeltaResponseWatch
deltaWatches map[int64]DeltaResponseWatch
orderedDeltaWatches keys
// the timestamp of the last watch request
lastWatchRequestTime time.Time
@ -177,3 +178,22 @@ func (info *statusInfo) orderResponseWatches() {
// This is only run when we enable ADS on the cache.
sort.Sort(info.orderedWatches)
}
// orderResponseDeltaWatches will track a list of delta watch keys and order them if
// true is passed.
func (info *statusInfo) orderResponseDeltaWatches() {
info.orderedDeltaWatches = make(keys, len(info.deltaWatches))
var index int
for id, deltaWatch := range info.deltaWatches {
info.orderedDeltaWatches[index] = key{
ID: id,
TypeURL: deltaWatch.Request.TypeUrl,
}
index++
}
// Sort our list which we can use in the SetSnapshot functions.
// This is only run when we enable ADS on the cache.
sort.Sort(info.orderedDeltaWatches)
}

View File

@ -31,7 +31,7 @@ type Callbacks interface {
// OnStreamDeltaRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamDeltaRequest(int64, *discovery.DeltaDiscoveryRequest) error
// OnStreamDelatResponse is called immediately prior to sending a response on a stream.
// OnStreamDeltaResponse is called immediately prior to sending a response on a stream.
OnStreamDeltaResponse(int64, *discovery.DeltaDiscoveryRequest, *discovery.DeltaDiscoveryResponse)
}
@ -83,7 +83,7 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
}
}()
// Sends a response, returns the new stream nonce
// sends a response, returns the new stream nonce
send := func(resp cache.DeltaResponse) (string, error) {
if resp == nil {
return "", errors.New("missing response")
@ -103,6 +103,44 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
return response.Nonce, str.Send(response)
}
// process a single delta response
process := func(resp cache.DeltaResponse) error {
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}
nonce, err := send(resp)
if err != nil {
return err
}
watch := watches.deltaWatches[typ]
watch.nonce = nonce
watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
return nil
}
// processAll purges the deltaMuxedResponses channel
processAll := func() error {
for {
select {
// We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
if !more {
break
}
if err := process(resp); err != nil {
return err
}
default:
return nil
}
}
}
if s.callbacks != nil {
if err := s.callbacks.OnDeltaStreamOpen(str.Context(), streamID, defaultTypeURL); err != nil {
return err
@ -113,35 +151,31 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
select {
case <-s.ctx.Done():
return nil
// We watch the multiplexed channel for incoming responses.
case resp, more := <-watches.deltaMuxedResponses:
// input stream ended or errored out
if !more {
break
}
typ := resp.GetDeltaRequest().GetTypeUrl()
if resp == deltaErrorResponse {
return status.Errorf(codes.Unavailable, typ+" watch failed")
}
nonce, err := send(resp)
if err != nil {
if err := process(resp); err != nil {
return err
}
watch := watches.deltaWatches[typ]
watch.nonce = nonce
watch.state.SetResourceVersions(resp.GetNextVersionMap())
watches.deltaWatches[typ] = watch
case req, more := <-reqCh:
// input stream ended or errored out
if !more {
return nil
}
if req == nil {
return status.Errorf(codes.Unavailable, "empty request")
}
// make sure all existing responses are processed prior to new requests to avoid deadlock
if err := processAll(); err != nil {
return err
}
if s.callbacks != nil {
if err := s.callbacks.OnStreamDeltaRequest(streamID, req); err != nil {
return err
@ -184,16 +218,8 @@ func (s *server) processDelta(str stream.DeltaStream, reqCh <-chan *discovery.De
s.subscribe(req.GetResourceNamesSubscribe(), &watch.state)
s.unsubscribe(req.GetResourceNamesUnsubscribe(), &watch.state)
watch.responses = make(chan cache.DeltaResponse, 1)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watch.responses)
watch.cancel = s.cache.CreateDeltaWatch(req, watch.state, watches.deltaMuxedResponses)
watches.deltaWatches[typeURL] = watch
go func() {
resp, more := <-watch.responses
if more {
watches.deltaMuxedResponses <- resp
}
}()
}
}
}

View File

@ -17,9 +17,13 @@ type watches struct {
// newWatches creates and initializes watches.
func newWatches() watches {
// deltaMuxedResponses needs a buffer to release go-routines populating it
//
// because deltaMuxedResponses can be populated by an update from the cache
// and a request from the client, we need to create the channel with a buffer
// size of 2x the number of types to avoid deadlocks.
return watches{
deltaWatches: make(map[string]watch, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)),
deltaMuxedResponses: make(chan cache.DeltaResponse, int(types.UnknownType)*2),
}
}
@ -28,13 +32,14 @@ func (w *watches) Cancel() {
for _, watch := range w.deltaWatches {
watch.Cancel()
}
close(w.deltaMuxedResponses)
}
// watch contains the necessary modifiables for receiving resource responses
type watch struct {
responses chan cache.DeltaResponse
cancel func()
nonce string
cancel func()
nonce string
state stream.StreamState
}
@ -44,9 +49,4 @@ func (w *watch) Cancel() {
if w.cancel != nil {
w.cancel()
}
if w.responses != nil {
// w.responses should never be used by a producer once cancel() has been closed, so we can safely close it here
// This is needed to release resources taken by goroutines watching this channel
close(w.responses)
}
}

View File

@ -5,8 +5,6 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
)
func TestDeltaWatches(t *testing.T) {
@ -14,14 +12,11 @@ func TestDeltaWatches(t *testing.T) {
watches := newWatches()
cancelCount := 0
var channels []chan cache.DeltaResponse
// create a few watches, and ensure that the cancel function are called and the channels are closed
for i := 0; i < 5; i++ {
newWatch := watch{}
if i%2 == 0 {
newWatch.cancel = func() { cancelCount++ }
newWatch.responses = make(chan cache.DeltaResponse)
channels = append(channels, newWatch.responses)
}
watches.deltaWatches[strconv.Itoa(i)] = newWatch
@ -30,13 +25,5 @@ func TestDeltaWatches(t *testing.T) {
watches.Cancel()
assert.Equal(t, 3, cancelCount)
for _, channel := range channels {
select {
case _, ok := <-channel:
assert.False(t, ok, "a channel was not closed")
default:
assert.Fail(t, "a channel was not closed")
}
}
})
}

View File

@ -345,6 +345,8 @@ func TestDeltaAggregatedHandlers(t *testing.T) {
resp.recv <- r
}
// We create the server with the optional ordered ADS flag so we guarantee resource
// ordering over the stream.
s := server.NewServer(context.Background(), config, server.CallbackFuncs{})
go func() {
err := s.DeltaAggregatedResources(resp)

View File

@ -38,7 +38,6 @@ import (
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/types"
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/wellknown"
)
const (
@ -268,11 +267,11 @@ func buildHTTPConnectionManager() *hcm.HttpConnectionManager {
CodecType: hcm.HttpConnectionManager_AUTO,
StatPrefix: "http",
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
Name: "http-router",
ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: routerConfig},
}},
AccessLog: []*alf.AccessLog{{
Name: wellknown.HTTPGRPCAccessLog,
Name: "access-logger",
ConfigType: &alf.AccessLog_TypedConfig{
TypedConfig: alsConfigPbst,
},
@ -374,7 +373,7 @@ func MakeScopedRouteHTTPListener(mode string, listenerName string, port uint32)
{
Filters: []*listener.Filter{
{
Name: wellknown.HTTPConnectionManager,
Name: "http-connection-manager",
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: pbst,
},
@ -431,7 +430,7 @@ func MakeScopedRouteHTTPListenerForRoute(mode string, listenerName string, port
{
Filters: []*listener.Filter{
{
Name: wellknown.HTTPConnectionManager,
Name: "http-connection-manager",
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: pbst,
},
@ -461,7 +460,7 @@ func MakeTCPListener(listenerName string, port uint32, clusterName string) *list
{
Filters: []*listener.Filter{
{
Name: wellknown.TCPProxy,
Name: "tcp-proxy",
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: pbst,
},
@ -506,7 +505,7 @@ func MakeExtensionConfig(mode string, extensionConfigName string, route string)
},
},
HttpFilters: []*hcm.HttpFilter{{
Name: wellknown.Router,
Name: "http-router",
ConfigType: &hcm.HttpFilter_TypedConfig{TypedConfig: routerConfig},
}},
}

View File

@ -13,6 +13,11 @@
// limitations under the License.
// Package wellknown contains common names for filters, listeners, etc.
//
// Deprecated. Envoy no longer requires specific names when configuring
// filters or other properties, since it inspects the Protobuf type URL
// to decide how to the decode a message. Because of this, no new names
// are being added to this package.
package wellknown
// HTTP filter names