Add queuing to profile translator (#11546)

https://github.com/linkerd/linkerd2/pull/11491 changed the EndpointTranslator to use a queue to avoid calling `Send` on a gRPC stream directly from an informer callback goroutine.  This change updates the ProfileTranslator in the same way, adding a queue to ensure we do not block the informer thread.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2023-11-09 04:32:30 -08:00 committed by GitHub
parent 1e605ddf63
commit 71635cbf3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 90 deletions

View File

@ -91,12 +91,11 @@ func FuzzProfileTranslatorUpdate(data []byte) int {
return 0
}
t := &testing.T{}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(profile)
return 1
}

View File

@ -11,6 +11,8 @@ import (
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/pkg/profiles"
"github.com/linkerd/linkerd2/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus"
)
@ -18,22 +20,90 @@ const millisPerDecimilli = 10
// implements the ProfileUpdateListener interface
type profileTranslator struct {
stream pb.Destination_GetProfileServer
log *logging.Entry
fullyQualifiedName string
port uint32
stream pb.Destination_GetProfileServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter
updates chan *sp.ServiceProfile
stop chan struct{}
}
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32) *profileTranslator {
var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "profile_updates_queue_overflow",
Help: "A counter incremented whenever the profile updates queue overflows",
},
[]string{
"fqn",
"port",
},
)
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
return &profileTranslator{
stream: stream,
log: log.WithField("component", "profile-translator"),
fullyQualifiedName: fqn,
port: port,
stream: stream,
endStream: endStream,
log: log.WithField("component", "profile-translator"),
overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
stop: make(chan struct{}),
}
}
// Update is called from a client-go informer callback and therefore must not
// We enqueue an update in a channel so that it can be processed asyncronously.
// To ensure that enqueuing does not block, we first check to see if there is
// capacity in the buffered channel. If there is not, we drop the update and
// signal to the stream that it has fallen too far behind and should be closed.
func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
select {
case pt.updates <- profile:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
pt.overflowCounter.Inc()
select {
case <-pt.endStream:
// The endStream channel has already been closed so no action is
// necessary.
default:
pt.log.Error("profile update queue full; aborting stream")
close(pt.endStream)
}
}
}
// Start initiates a goroutine which processes update events off of the
// profileTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
// not be called more than once.
func (pt *profileTranslator) Start() {
go func() {
for {
select {
case update := <-pt.updates:
pt.update(update)
case <-pt.stop:
return
}
}
}()
}
// Stop terminates the goroutine started by Start.
func (pt *profileTranslator) Stop() {
close(pt.stop)
}
func (pt *profileTranslator) update(profile *sp.ServiceProfile) {
if profile == nil {
pt.log.Debugf("Sending default profile")
if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {

View File

@ -421,52 +421,49 @@ var (
func TestProfileTranslator(t *testing.T) {
t.Run("Sends update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(profile)
numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbProfile) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
t.Run("Request match with more than one field becomes ALL", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(multipleRequestMatches)
numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbRequestMatchAll) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
t.Run("Ignores request match without any fields", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(notEnoughRequestMatches)
@ -477,32 +474,30 @@ func TestProfileTranslator(t *testing.T) {
})
t.Run("Response match with more than one field becomes ALL", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(multipleResponseMatches)
numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbResponseMatchAll) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
t.Run("Ignores response match without any fields", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(notEnoughResponseMatches)
@ -513,12 +508,11 @@ func TestProfileTranslator(t *testing.T) {
})
t.Run("Ignores response match with invalid status range", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(invalidStatusRange)
@ -529,58 +523,57 @@ func TestProfileTranslator(t *testing.T) {
})
t.Run("Sends update for one sided status range", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(oneSidedStatusRange)
numProfiles := len(mockGetProfileServer.profilesReceived)
<-mockGetProfileServer.profilesReceived
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
t.Run("Sends empty update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(nil)
numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, defaultPbProfile) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
t.Run("Sends update with custom timeout", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{
stream: mockGetProfileServer,
log: logging.WithField("test", t.Name()),
}
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
translator.Start()
defer translator.Stop()
translator.Update(profileWithTimeout)
numProfiles := len(mockGetProfileServer.profilesReceived)
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
actualPbProfile := <-mockGetProfileServer.profilesReceived
if !proto.Equal(actualPbProfile, pbProfileWithTimeout) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile)
}
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
})
}

View File

@ -345,11 +345,14 @@ func (s *server) subscribeToServiceProfile(
WithField("port", port)
canceled := stream.Context().Done()
streamEnd := make(chan struct{})
// We build up the pipeline of profile updaters backwards, starting from
// the translator which takes profile updates, translates them to protobuf
// and pushes them onto the gRPC stream.
translator := newProfileTranslator(stream, log, fqn, port)
translator := newProfileTranslator(stream, log, fqn, port, streamEnd)
translator.Start()
defer translator.Stop()
// The opaque ports adaptor merges profile updates with service opaque
// port annotation updates; it then publishes the result to the traffic
@ -376,9 +379,9 @@ func (s *server) subscribeToServiceProfile(
// namespace. If there's no namespace in the token, start a single
// subscription.
if token.Ns == "" {
return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log)
return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log, streamEnd)
}
return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log)
return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log, streamEnd)
}
// subscribeToServiceWithContext establishes two profile watches: a "backup"
@ -393,6 +396,7 @@ func (s *server) subscribeToServicesWithContext(
listener watcher.ProfileUpdateListener,
canceled <-chan struct{},
log *logging.Entry,
streamEnd <-chan struct{},
) error {
// We ned to support two subscriptions:
// - First, a backup subscription that assumes the context of the server
@ -430,7 +434,9 @@ func (s *server) subscribeToServicesWithContext(
select {
case <-s.shutdown:
case <-canceled:
log.Debug("Cancelled")
log.Debugf("GetProfile %s cancelled", fqn)
case <-streamEnd:
log.Errorf("GetProfile %s stream aborted", fqn)
}
return nil
}
@ -440,8 +446,9 @@ func (s *server) subscribeToServicesWithContext(
func (s *server) subscribeToServiceWithoutContext(
fqn string,
listener watcher.ProfileUpdateListener,
cancel <-chan struct{},
canceled <-chan struct{},
log *logging.Entry,
streamEnd <-chan struct{},
) error {
id, err := profileID(fqn, contextToken{}, s.clusterDomain)
if err != nil {
@ -457,8 +464,10 @@ func (s *server) subscribeToServiceWithoutContext(
select {
case <-s.shutdown:
case <-cancel:
log.Debug("Cancelled")
case <-canceled:
log.Debugf("GetProfile %s cancelled", fqn)
case <-streamEnd:
log.Errorf("GetProfile %s stream aborted", fqn)
}
return nil
}

View File

@ -567,11 +567,11 @@ func (m *mockDestinationGetServer) Send(update *pb.Update) error {
type mockDestinationGetProfileServer struct {
util.MockServerStream
profilesReceived []*pb.DestinationProfile
profilesReceived chan *pb.DestinationProfile
}
func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) error {
m.profilesReceived = append(m.profilesReceived, profile)
m.profilesReceived <- profile
return nil
}