mirror of https://github.com/linkerd/linkerd2.git
Add queuing to profile translator (#11546)
https://github.com/linkerd/linkerd2/pull/11491 changed the EndpointTranslator to use a queue to avoid calling `Send` on a gRPC stream directly from an informer callback goroutine. This change updates the ProfileTranslator in the same way, adding a queue to ensure we do not block the informer thread. Signed-off-by: Alex Leong <alex@buoyant.io>
This commit is contained in:
parent
1e605ddf63
commit
71635cbf3d
|
@ -91,12 +91,11 @@ func FuzzProfileTranslatorUpdate(data []byte) int {
|
|||
return 0
|
||||
}
|
||||
t := &testing.T{}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "foo.bar.svc.cluster.local", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
translator.Update(profile)
|
||||
return 1
|
||||
}
|
||||
|
|
|
@ -11,6 +11,8 @@ import (
|
|||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
"github.com/linkerd/linkerd2/pkg/profiles"
|
||||
"github.com/linkerd/linkerd2/pkg/util"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/promauto"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
|
@ -18,22 +20,90 @@ const millisPerDecimilli = 10
|
|||
|
||||
// implements the ProfileUpdateListener interface
|
||||
type profileTranslator struct {
|
||||
stream pb.Destination_GetProfileServer
|
||||
log *logging.Entry
|
||||
fullyQualifiedName string
|
||||
port uint32
|
||||
|
||||
stream pb.Destination_GetProfileServer
|
||||
endStream chan struct{}
|
||||
log *logging.Entry
|
||||
overflowCounter prometheus.Counter
|
||||
|
||||
updates chan *sp.ServiceProfile
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32) *profileTranslator {
|
||||
var profileUpdatesQueueOverflowCounter = promauto.NewCounterVec(
|
||||
prometheus.CounterOpts{
|
||||
Name: "profile_updates_queue_overflow",
|
||||
Help: "A counter incremented whenever the profile updates queue overflows",
|
||||
},
|
||||
[]string{
|
||||
"fqn",
|
||||
"port",
|
||||
},
|
||||
)
|
||||
|
||||
func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.Entry, fqn string, port uint32, endStream chan struct{}) *profileTranslator {
|
||||
return &profileTranslator{
|
||||
stream: stream,
|
||||
log: log.WithField("component", "profile-translator"),
|
||||
fullyQualifiedName: fqn,
|
||||
port: port,
|
||||
|
||||
stream: stream,
|
||||
endStream: endStream,
|
||||
log: log.WithField("component", "profile-translator"),
|
||||
overflowCounter: profileUpdatesQueueOverflowCounter.With(prometheus.Labels{"fqn": fqn, "port": fmt.Sprintf("%d", port)}),
|
||||
updates: make(chan *sp.ServiceProfile, updateQueueCapacity),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Update is called from a client-go informer callback and therefore must not
|
||||
// We enqueue an update in a channel so that it can be processed asyncronously.
|
||||
// To ensure that enqueuing does not block, we first check to see if there is
|
||||
// capacity in the buffered channel. If there is not, we drop the update and
|
||||
// signal to the stream that it has fallen too far behind and should be closed.
|
||||
func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
|
||||
select {
|
||||
case pt.updates <- profile:
|
||||
// Update has been successfully enqueued.
|
||||
default:
|
||||
// We are unable to enqueue because the channel does not have capacity.
|
||||
// The stream has fallen too far behind and should be closed.
|
||||
pt.overflowCounter.Inc()
|
||||
select {
|
||||
case <-pt.endStream:
|
||||
// The endStream channel has already been closed so no action is
|
||||
// necessary.
|
||||
default:
|
||||
pt.log.Error("profile update queue full; aborting stream")
|
||||
close(pt.endStream)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Start initiates a goroutine which processes update events off of the
|
||||
// profileTranslator's internal queue and sends to the grpc stream as
|
||||
// appropriate. The goroutine calls non-thread-safe Send, therefore Start must
|
||||
// not be called more than once.
|
||||
func (pt *profileTranslator) Start() {
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case update := <-pt.updates:
|
||||
pt.update(update)
|
||||
case <-pt.stop:
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop terminates the goroutine started by Start.
|
||||
func (pt *profileTranslator) Stop() {
|
||||
close(pt.stop)
|
||||
}
|
||||
|
||||
func (pt *profileTranslator) update(profile *sp.ServiceProfile) {
|
||||
if profile == nil {
|
||||
pt.log.Debugf("Sending default profile")
|
||||
if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {
|
||||
|
|
|
@ -421,52 +421,49 @@ var (
|
|||
|
||||
func TestProfileTranslator(t *testing.T) {
|
||||
t.Run("Sends update", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(profile)
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived)
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
actualPbProfile := mockGetProfileServer.profilesReceived[0]
|
||||
actualPbProfile := <-mockGetProfileServer.profilesReceived
|
||||
if !proto.Equal(actualPbProfile, pbProfile) {
|
||||
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfile, actualPbProfile)
|
||||
}
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Request match with more than one field becomes ALL", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(multipleRequestMatches)
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived)
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
actualPbProfile := mockGetProfileServer.profilesReceived[0]
|
||||
actualPbProfile := <-mockGetProfileServer.profilesReceived
|
||||
if !proto.Equal(actualPbProfile, pbRequestMatchAll) {
|
||||
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbRequestMatchAll, actualPbProfile)
|
||||
}
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Ignores request match without any fields", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(notEnoughRequestMatches)
|
||||
|
||||
|
@ -477,32 +474,30 @@ func TestProfileTranslator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Response match with more than one field becomes ALL", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(multipleResponseMatches)
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived)
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
actualPbProfile := mockGetProfileServer.profilesReceived[0]
|
||||
actualPbProfile := <-mockGetProfileServer.profilesReceived
|
||||
if !proto.Equal(actualPbProfile, pbResponseMatchAll) {
|
||||
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbResponseMatchAll, actualPbProfile)
|
||||
}
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profiles, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Ignores response match without any fields", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(notEnoughResponseMatches)
|
||||
|
||||
|
@ -513,12 +508,11 @@ func TestProfileTranslator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Ignores response match with invalid status range", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(invalidStatusRange)
|
||||
|
||||
|
@ -529,58 +523,57 @@ func TestProfileTranslator(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Sends update for one sided status range", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(oneSidedStatusRange)
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived)
|
||||
<-mockGetProfileServer.profilesReceived
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Sends empty update", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(nil)
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived)
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
actualPbProfile := mockGetProfileServer.profilesReceived[0]
|
||||
actualPbProfile := <-mockGetProfileServer.profilesReceived
|
||||
if !proto.Equal(actualPbProfile, defaultPbProfile) {
|
||||
t.Fatalf("Expected profile sent to be [%v] but was [%v]", defaultPbProfile, actualPbProfile)
|
||||
}
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Sends update with custom timeout", func(t *testing.T) {
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: []*pb.DestinationProfile{}}
|
||||
mockGetProfileServer := &mockDestinationGetProfileServer{profilesReceived: make(chan *pb.DestinationProfile, 50)}
|
||||
|
||||
translator := &profileTranslator{
|
||||
stream: mockGetProfileServer,
|
||||
log: logging.WithField("test", t.Name()),
|
||||
}
|
||||
translator := newProfileTranslator(mockGetProfileServer, logging.WithField("test", t.Name()), "", 80, nil)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
translator.Update(profileWithTimeout)
|
||||
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived)
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
actualPbProfile := mockGetProfileServer.profilesReceived[0]
|
||||
actualPbProfile := <-mockGetProfileServer.profilesReceived
|
||||
if !proto.Equal(actualPbProfile, pbProfileWithTimeout) {
|
||||
t.Fatalf("Expected profile sent to be [%v] but was [%v]", pbProfileWithTimeout, actualPbProfile)
|
||||
}
|
||||
numProfiles := len(mockGetProfileServer.profilesReceived) + 1
|
||||
if numProfiles != 1 {
|
||||
t.Fatalf("Expecting [1] profile, got [%d]. Updates: %v", numProfiles, mockGetProfileServer.profilesReceived)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
|
|
@ -345,11 +345,14 @@ func (s *server) subscribeToServiceProfile(
|
|||
WithField("port", port)
|
||||
|
||||
canceled := stream.Context().Done()
|
||||
streamEnd := make(chan struct{})
|
||||
|
||||
// We build up the pipeline of profile updaters backwards, starting from
|
||||
// the translator which takes profile updates, translates them to protobuf
|
||||
// and pushes them onto the gRPC stream.
|
||||
translator := newProfileTranslator(stream, log, fqn, port)
|
||||
translator := newProfileTranslator(stream, log, fqn, port, streamEnd)
|
||||
translator.Start()
|
||||
defer translator.Stop()
|
||||
|
||||
// The opaque ports adaptor merges profile updates with service opaque
|
||||
// port annotation updates; it then publishes the result to the traffic
|
||||
|
@ -376,9 +379,9 @@ func (s *server) subscribeToServiceProfile(
|
|||
// namespace. If there's no namespace in the token, start a single
|
||||
// subscription.
|
||||
if token.Ns == "" {
|
||||
return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log)
|
||||
return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log, streamEnd)
|
||||
}
|
||||
return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log)
|
||||
return s.subscribeToServicesWithContext(fqn, token, listener, canceled, log, streamEnd)
|
||||
}
|
||||
|
||||
// subscribeToServiceWithContext establishes two profile watches: a "backup"
|
||||
|
@ -393,6 +396,7 @@ func (s *server) subscribeToServicesWithContext(
|
|||
listener watcher.ProfileUpdateListener,
|
||||
canceled <-chan struct{},
|
||||
log *logging.Entry,
|
||||
streamEnd <-chan struct{},
|
||||
) error {
|
||||
// We ned to support two subscriptions:
|
||||
// - First, a backup subscription that assumes the context of the server
|
||||
|
@ -430,7 +434,9 @@ func (s *server) subscribeToServicesWithContext(
|
|||
select {
|
||||
case <-s.shutdown:
|
||||
case <-canceled:
|
||||
log.Debug("Cancelled")
|
||||
log.Debugf("GetProfile %s cancelled", fqn)
|
||||
case <-streamEnd:
|
||||
log.Errorf("GetProfile %s stream aborted", fqn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -440,8 +446,9 @@ func (s *server) subscribeToServicesWithContext(
|
|||
func (s *server) subscribeToServiceWithoutContext(
|
||||
fqn string,
|
||||
listener watcher.ProfileUpdateListener,
|
||||
cancel <-chan struct{},
|
||||
canceled <-chan struct{},
|
||||
log *logging.Entry,
|
||||
streamEnd <-chan struct{},
|
||||
) error {
|
||||
id, err := profileID(fqn, contextToken{}, s.clusterDomain)
|
||||
if err != nil {
|
||||
|
@ -457,8 +464,10 @@ func (s *server) subscribeToServiceWithoutContext(
|
|||
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
case <-cancel:
|
||||
log.Debug("Cancelled")
|
||||
case <-canceled:
|
||||
log.Debugf("GetProfile %s cancelled", fqn)
|
||||
case <-streamEnd:
|
||||
log.Errorf("GetProfile %s stream aborted", fqn)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -567,11 +567,11 @@ func (m *mockDestinationGetServer) Send(update *pb.Update) error {
|
|||
|
||||
type mockDestinationGetProfileServer struct {
|
||||
util.MockServerStream
|
||||
profilesReceived []*pb.DestinationProfile
|
||||
profilesReceived chan *pb.DestinationProfile
|
||||
}
|
||||
|
||||
func (m *mockDestinationGetProfileServer) Send(profile *pb.DestinationProfile) error {
|
||||
m.profilesReceived = append(m.profilesReceived, profile)
|
||||
m.profilesReceived <- profile
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue