Add queuing to profile translator (#11546)

https://github.com/linkerd/linkerd2/pull/11491 changed the EndpointTranslator to use a queue to avoid calling `Send` on a gRPC stream directly from an informer callback goroutine.  This change updates the ProfileTranslator in the same way, adding a queue to ensure we do not block the informer thread.

Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
Alex Leong 2023-11-09 04:32:30 -08:00 committed by GitHub
parent 1e605ddf63
commit 71635cbf3d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 161 additions and 90 deletions

View File

@ -91,12 +91,11 @@ func FuzzProfileTranslatorUpdate(data []byte) int {
return 0 return 0
} }
t := &testing.T{} t := &testing.T{}
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(profile) translator.Update(profile)
return 1 return 1
} }

View File

@ -11,6 +11,8 @@ import (
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/pkg/profiles" "github.com/linkerd/linkerd2/pkg/profiles"
"github.com/linkerd/linkerd2/pkg/util" "github.com/linkerd/linkerd2/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
logging "github.com/sirupsen/logrus" logging "github.com/sirupsen/logrus"
) )
@ -18,22 +20,90 @@ const millisPerDecimilli = 10
// implements the ProfileUpdateListener interface // implements the ProfileUpdateListener interface
type profileTranslator struct { type profileTranslator struct {
stream pb.Destination_GetProfileServer
log *logging.Entry
fullyQualifiedName string fullyQualifiedName string
port uint32 port uint32
stream pb.Destination_GetProfileServer
endStream chan struct{}
log *logging.Entry
overflowCounter prometheus.Counter
updates chan *sp.ServiceProfile
stop chan struct{}
} }
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32) *profileTranslator { var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Name: "profile_updates_queue_overflow",
Help: "A counter incremented whenever the profile updates queue overflows",
},
[]string{
"fqn",
"port",
},
)
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
return &profileTranslator{ return &profileTranslator{
stream: stream,
log: log.WithField("component", "profile-translator"),
fullyQualifiedName: fqn, fullyQualifiedName: fqn,
port: port, port: port,
stream: stream,
endStream: endStream,
log: log.WithField("component", "profile-translator"),
overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
stop: make(chan struct{}),
} }
} }
// Update is called from a client-go informer callback and therefore must not
// We enqueue an update in a channel so that it can be processed asyncronously.
// To ensure that enqueuing does not block, we first check to see if there is
// capacity in the buffered channel. If there is not, we drop the update and
// signal to the stream that it has fallen too far behind and should be closed.
func (pt *profileTranslator) Update(profile *sp.ServiceProfile) { func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
select {
case pt.updates <- profile:
// Update has been successfully enqueued.
default:
// We are unable to enqueue because the channel does not have capacity.
// The stream has fallen too far behind and should be closed.
pt.overflowCounter.Inc()
select {
case <-pt.endStream:
// The endStream channel has already been closed so no action is
// necessary.
default:
pt.log.Error("profile update queue full; aborting stream")
close(pt.endStream)
}
}
}
// Start initiates a goroutine which processes update events off of the
// profileTranslator's internal queue and sends to the grpc stream as
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
// not be called more than once.
func (pt *profileTranslator) Start() {
go func() {
for {
select {
case update := <-pt.updates:
pt.update(update)
case <-pt.stop:
return
}
}
}()
}
// Stop terminates the goroutine started by Start.
func (pt *profileTranslator) Stop() {
close(pt.stop)
}
func (pt *profileTranslator) update(profile *sp.ServiceProfile) {
if profile == nil { if profile == nil {
pt.log.Debugf("Sending default profile") pt.log.Debugf("Sending default profile")
if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil { if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {

View File

@ -421,52 +421,49 @@ var (
func TestProfileTranslator(t *testing.T) { func TestProfileTranslator(t *testing.T) {
t.Run("Sends update", func(t *testing.T) { t.Run("Sends update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(profile) translator.Update(profile)
numProfiles := len(mockGetProfileServer.profilesReceived) actualPbProfile := <-mockGetProfileServer.profilesReceived
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
if !proto.Equal(actualPbProfile, pbProfile) { if !proto.Equal(actualPbProfile, pbProfile) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile) t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile)
} }
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
}) })
t.Run("Request match with more than one field becomes ALL", func(t *testing.T) { t.Run("Request match with more than one field becomes ALL", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(multipleRequestMatches) translator.Update(multipleRequestMatches)
numProfiles := len(mockGetProfileServer.profilesReceived) actualPbProfile := <-mockGetProfileServer.profilesReceived
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
if !proto.Equal(actualPbProfile, pbRequestMatchAll) { if !proto.Equal(actualPbProfile, pbRequestMatchAll) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile) t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile)
} }
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
}) })
t.Run("Ignores request match without any fields", func(t *testing.T) { t.Run("Ignores request match without any fields", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(notEnoughRequestMatches) translator.Update(notEnoughRequestMatches)
@ -477,32 +474,30 @@ func TestProfileTranslator(t *testing.T) {
}) })
t.Run("Response match with more than one field becomes ALL", func(t *testing.T) { t.Run("Response match with more than one field becomes ALL", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(multipleResponseMatches) translator.Update(multipleResponseMatches)
numProfiles := len(mockGetProfileServer.profilesReceived) actualPbProfile := <-mockGetProfileServer.profilesReceived
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
if !proto.Equal(actualPbProfile, pbResponseMatchAll) { if !proto.Equal(actualPbProfile, pbResponseMatchAll) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile) t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile)
} }
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
}) })
t.Run("Ignores response match without any fields", func(t *testing.T) { t.Run("Ignores response match without any fields", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(notEnoughResponseMatches) translator.Update(notEnoughResponseMatches)
@ -513,12 +508,11 @@ func TestProfileTranslator(t *testing.T) {
}) })
t.Run("Ignores response match with invalid status range", func(t *testing.T) { t.Run("Ignores response match with invalid status range", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(invalidStatusRange) translator.Update(invalidStatusRange)
@ -529,58 +523,57 @@ func TestProfileTranslator(t *testing.T) {
}) })
t.Run("Sends update for one sided status range", func(t *testing.T) { t.Run("Sends update for one sided status range", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(oneSidedStatusRange) translator.Update(oneSidedStatusRange)
numProfiles := len(mockGetProfileServer.profilesReceived) <-mockGetProfileServer.profilesReceived
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 { if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived) t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
} }
}) })
t.Run("Sends empty update", func(t *testing.T) { t.Run("Sends empty update", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(nil) translator.Update(nil)
numProfiles := len(mockGetProfileServer.profilesReceived) actualPbProfile := <-mockGetProfileServer.profilesReceived
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
if !proto.Equal(actualPbProfile, defaultPbProfile) { if !proto.Equal(actualPbProfile, defaultPbProfile) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile) t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile)
} }
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
}) })
t.Run("Sends update with custom timeout", func(t *testing.T) { t.Run("Sends update with custom timeout", func(t *testing.T) {
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}} mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
translator := &profileTranslator{ translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
stream: mockGetProfileServer, translator.Start()
log: logging.WithField("test", t.Name()), defer translator.Stop()
}
translator.Update(profileWithTimeout) translator.Update(profileWithTimeout)
numProfiles := len(mockGetProfileServer.profilesReceived) actualPbProfile := <-mockGetProfileServer.profilesReceived
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
actualPbProfile := mockGetProfileServer.profilesReceived[0]
if !proto.Equal(actualPbProfile, pbProfileWithTimeout) { if !proto.Equal(actualPbProfile, pbProfileWithTimeout) {
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile) t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile)
} }
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
if numProfiles != 1 {
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
}
}) })
} }

View File

@ -345,11 +345,14 @@ func (s *server) subscribeToServiceProfile(
WithField("port", port) WithField("port", port)
canceled := stream.Context().Done() canceled := stream.Context().Done()
streamEnd := make(chan struct{})
// We build up the pipeline of profile updaters backwards, starting from // We build up the pipeline of profile updaters backwards, starting from
// the translator which takes profile updates, translates them to protobuf // the translator which takes profile updates, translates them to protobuf
// and pushes them onto the gRPC stream. // and pushes them onto the gRPC stream.
translator := newProfileTranslator(stream, log, fqn, port) translator := newProfileTranslator(stream, log, fqn, port, streamEnd)
translator.Start()
defer translator.Stop()
// The opaque ports adaptor merges profile updates with service opaque // The opaque ports adaptor merges profile updates with service opaque
// port annotation updates; it then publishes the result to the traffic // port annotation updates; it then publishes the result to the traffic
@ -376,9 +379,9 @@ func (s *server) subscribeToServiceProfile(
// namespace. If there's no namespace in the token, start a single // namespace. If there's no namespace in the token, start a single
// subscription. // subscription.
if token.Ns == "" { if token.Ns == "" {
return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log) return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log, streamEnd)
} }
return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log) return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log, streamEnd)
} }
// subscribeToServiceWithContext establishes two profile watches: a "backup" // subscribeToServiceWithContext establishes two profile watches: a "backup"
@ -393,6 +396,7 @@ func (s *server) subscribeToServicesWithContext(
listener watcher.ProfileUpdateListener, listener watcher.ProfileUpdateListener,
canceled <-chan struct{}, canceled <-chan struct{},
log *logging.Entry, log *logging.Entry,
streamEnd <-chan struct{},
) error { ) error {
// We ned to support two subscriptions: // We ned to support two subscriptions:
// - First, a backup subscription that assumes the context of the server // - First, a backup subscription that assumes the context of the server
@ -430,7 +434,9 @@ func (s *server) subscribeToServicesWithContext(
select { select {
case <-s.shutdown: case <-s.shutdown:
case <-canceled: case <-canceled:
log.Debug("Cancelled") log.Debugf("GetProfile %s cancelled", fqn)
case <-streamEnd:
log.Errorf("GetProfile %s stream aborted", fqn)
} }
return nil return nil
} }
@ -440,8 +446,9 @@ func (s *server) subscribeToServicesWithContext(
func (s *server) subscribeToServiceWithoutContext( func (s *server) subscribeToServiceWithoutContext(
fqn string, fqn string,
listener watcher.ProfileUpdateListener, listener watcher.ProfileUpdateListener,
cancel <-chan struct{}, canceled <-chan struct{},
log *logging.Entry, log *logging.Entry,
streamEnd <-chan struct{},
) error { ) error {
id, err := profileID(fqn, contextToken{}, s.clusterDomain) id, err := profileID(fqn, contextToken{}, s.clusterDomain)
if err != nil { if err != nil {
@ -457,8 +464,10 @@ func (s *server) subscribeToServiceWithoutContext(
select { select {
case <-s.shutdown: case <-s.shutdown:
case <-cancel: case <-canceled:
log.Debug("Cancelled") log.Debugf("GetProfile %s cancelled", fqn)
case <-streamEnd:
log.Errorf("GetProfile %s stream aborted", fqn)
} }
return nil return nil
} }

View File

@ -567,11 +567,11 @@ func (m *mockDestinationGetServer) Send(update *pb.Update) error {
type mockDestinationGetProfileServer struct { type mockDestinationGetProfileServer struct {
util.MockServerStream util.MockServerStream
profilesReceived []*pb.DestinationProfile profilesReceived chan *pb.DestinationProfile
} }
func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) error { func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) error {
m.profilesReceived = append(m.profilesReceived, profile) m.profilesReceived <- profile
return nil return nil
} }