diff --git a/controller/api/destination/dedup_profile_listener.go b/controller/api/destination/dedup_profile_listener.go new file mode 100644 index 000000000..0676b0580 --- /dev/null +++ b/controller/api/destination/dedup_profile_listener.go @@ -0,0 +1,31 @@ +package destination + +import ( + "github.com/linkerd/linkerd2/controller/api/destination/watcher" + sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" + log "github.com/sirupsen/logrus" +) + +type dedupProfileListener struct { + parent watcher.ProfileUpdateListener + state *sp.ServiceProfile + initialized bool + log *log.Entry +} + +func newDedupProfileListener( + parent watcher.ProfileUpdateListener, + log *log.Entry, +) watcher.ProfileUpdateListener { + return &dedupProfileListener{parent, nil, false, log} +} + +func (p *dedupProfileListener) Update(profile *sp.ServiceProfile) { + if p.initialized && profile == p.state { + log.Debug("Skipping redundant update") + return + } + p.parent.Update(profile) + p.initialized = true + p.state = profile +} diff --git a/controller/api/destination/default_profile_listener.go b/controller/api/destination/default_profile_listener.go new file mode 100644 index 000000000..e6fb0481c --- /dev/null +++ b/controller/api/destination/default_profile_listener.go @@ -0,0 +1,29 @@ +package destination + +import ( + "github.com/linkerd/linkerd2/controller/api/destination/watcher" + sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" + log "github.com/sirupsen/logrus" +) + +type defaultProfileListener struct { + parent watcher.ProfileUpdateListener + profile *sp.ServiceProfile + log *log.Entry +} + +func newDefaultProfileListener( + profile *sp.ServiceProfile, + parent watcher.ProfileUpdateListener, + log *log.Entry, +) watcher.ProfileUpdateListener { + return &defaultProfileListener{parent, profile, log} +} + +func (p *defaultProfileListener) Update(profile *sp.ServiceProfile) { + if profile == nil { + log.Debug("Using default profile") + profile = p.profile + } + p.parent.Update(profile) +} diff --git a/controller/api/destination/fallback_profile_listener.go b/controller/api/destination/fallback_profile_listener.go index a6b17c9fa..6956edc99 100644 --- a/controller/api/destination/fallback_profile_listener.go +++ b/controller/api/destination/fallback_profile_listener.go @@ -5,99 +5,90 @@ import ( "github.com/linkerd/linkerd2/controller/api/destination/watcher" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" + logging "github.com/sirupsen/logrus" ) type fallbackProfileListener struct { - underlying watcher.ProfileUpdateListener - primary *primaryProfileListener - backup *backupProfileListener - mutex sync.Mutex + primary, backup *childListener + parent watcher.ProfileUpdateListener + log *logging.Entry + mutex sync.Mutex } -type fallbackChildListener struct { +type childListener struct { // state is only referenced from the outer struct primaryProfileListener // or backupProfileListener (e.g. listener.state where listener's type is // _not_ this struct). structcheck issues a false positive for this field // as it does not think it's used. //nolint:structcheck - state *sp.ServiceProfile - parent *fallbackProfileListener + state *sp.ServiceProfile + initialized bool + parent *fallbackProfileListener } -type primaryProfileListener struct { - fallbackChildListener -} - -type backupProfileListener struct { - fallbackChildListener -} - -// newFallbackProfileListener takes an underlying profileUpdateListener and -// returns two profileUpdateListeners: a primary and a backup. Updates to -// the primary and backup will propagate to the underlying with updates to -// the primary always taking priority. If the value in the primary is cleared, -// the value from the backup is used. -func newFallbackProfileListener(listener watcher.ProfileUpdateListener) (watcher.ProfileUpdateListener, watcher.ProfileUpdateListener) { +// newFallbackProfileListener takes a parent ProfileUpdateListener and returns +// two ProfileUpdateListeners: a primary and a backup. +// +// If the primary listener is updated with a non-nil value, it is published to +// the parent listener. +// +// Otherwise, if the backup listener has most recently been updated with a +// non-nil value, its valeu is published to the parent listener. +// +// A nil ServiceProfile is published only when both the primary and backup have +// been initialized and have nil values. +func newFallbackProfileListener( + parent watcher.ProfileUpdateListener, + log *logging.Entry, +) (watcher.ProfileUpdateListener, watcher.ProfileUpdateListener) { // Primary and backup share a lock to ensure updates are atomic. fallback := fallbackProfileListener{ - mutex: sync.Mutex{}, - underlying: listener, + mutex: sync.Mutex{}, + log: log, } - primary := primaryProfileListener{ - fallbackChildListener{ - parent: &fallback, - }, + primary := childListener{ + initialized: false, + parent: &fallback, } - backup := backupProfileListener{ - fallbackChildListener{ - parent: &fallback, - }, + + backup := childListener{ + initialized: false, + parent: &fallback, } + + fallback.parent = parent fallback.primary = &primary fallback.backup = &backup + return &primary, &backup } -// Primary +func (f *fallbackProfileListener) publish() { + if !f.primary.initialized { + f.log.Debug("Waiting for primary profile listener to be initialized") + return + } + if !f.backup.initialized { + f.log.Debug("Waiting for backup profile listener to be initialized") + return + } -func (p *primaryProfileListener) Update(profile *sp.ServiceProfile) { + if f.primary.state == nil && f.backup.state != nil { + f.log.Debug("Publishing backup profile") + f.parent.Update(f.backup.state) + return + } + + f.log.Debug("Publishing primary profile") + f.parent.Update(f.primary.state) +} + +func (p *childListener) Update(profile *sp.ServiceProfile) { p.parent.mutex.Lock() defer p.parent.mutex.Unlock() p.state = profile - - if p.state != nil { - // We got a value; apply the update. - p.parent.underlying.Update(p.state) - return - } - if p.parent.backup != nil { - // Our value was cleared; fall back to backup. - p.parent.underlying.Update(p.parent.backup.state) - return - } - // Our value was cleared and there is no backup value. - p.parent.underlying.Update(nil) -} - -// Backup - -func (b *backupProfileListener) Update(profile *sp.ServiceProfile) { - b.parent.mutex.Lock() - defer b.parent.mutex.Unlock() - - b.state = profile - - if b.parent.primary != nil && b.parent.primary.state != nil { - // Primary has a value, so ignore this update. - return - } - if b.state != nil { - // We got a value; apply the update. - b.parent.underlying.Update(b.state) - return - } - // Our value was cleared and there is no primary value. - b.parent.underlying.Update(nil) + p.initialized = true + p.parent.publish() } diff --git a/controller/api/destination/fallback_profile_listener_test.go b/controller/api/destination/fallback_profile_listener_test.go index 99db684bf..88ffadce0 100644 --- a/controller/api/destination/fallback_profile_listener_test.go +++ b/controller/api/destination/fallback_profile_listener_test.go @@ -5,6 +5,7 @@ import ( "github.com/linkerd/linkerd2/controller/api/destination/watcher" sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" + logging "github.com/sirupsen/logrus" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" ) @@ -41,65 +42,56 @@ func TestFallbackProfileListener(t *testing.T) { } t.Run("Primary updated", func(t *testing.T) { - primary, _, listener := newListeners() - + primary, backup, listener := newListeners() primary.Update(&primaryProfile) - + assertEq(t, listener.received, []*sp.ServiceProfile{}) + backup.Update(nil) assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile}) }) t.Run("Backup updated", func(t *testing.T) { - _, backup, listener := newListeners() - + primary, backup, listener := newListeners() backup.Update(&backupProfile) - + primary.Update(nil) assertEq(t, listener.received, []*sp.ServiceProfile{&backupProfile}) }) t.Run("Primary cleared", func(t *testing.T) { - primary, _, listener := newListeners() - + primary, backup, listener := newListeners() + backup.Update(nil) primary.Update(&primaryProfile) primary.Update(nil) - assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile, nil}) }) t.Run("Backup cleared", func(t *testing.T) { - _, backup, listener := newListeners() - + primary, backup, listener := newListeners() backup.Update(&backupProfile) + primary.Update(nil) backup.Update(nil) - assertEq(t, listener.received, []*sp.ServiceProfile{&backupProfile, nil}) }) t.Run("Primary overrides backup", func(t *testing.T) { primary, backup, listener := newListeners() - backup.Update(&backupProfile) primary.Update(&primaryProfile) - - assertEq(t, listener.received, []*sp.ServiceProfile{&backupProfile, &primaryProfile}) + assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile}) }) t.Run("Backup update ignored", func(t *testing.T) { primary, backup, listener := newListeners() - primary.Update(&primaryProfile) backup.Update(&backupProfile) backup.Update(nil) - - assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile}) + assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile, &primaryProfile}) }) t.Run("Fallback to backup", func(t *testing.T) { primary, backup, listener := newListeners() - primary.Update(&primaryProfile) backup.Update(&backupProfile) primary.Update(nil) - assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile, &backupProfile}) }) @@ -110,11 +102,12 @@ func newListeners() (watcher.ProfileUpdateListener, watcher.ProfileUpdateListene received: []*sp.ServiceProfile{}, } - primary, backup := newFallbackProfileListener(listener) + primary, backup := newFallbackProfileListener(listener, logging.NewEntry(logging.New())) return primary, backup, listener } func assertEq(t *testing.T, received []*sp.ServiceProfile, expected []*sp.ServiceProfile) { + t.Helper() if len(received) != len(expected) { t.Fatalf("Expected %d profile updates, got %d", len(expected), len(received)) } diff --git a/controller/api/destination/opaque_ports_adaptor.go b/controller/api/destination/opaque_ports_adaptor.go index 8acd94244..c3cfa0cce 100644 --- a/controller/api/destination/opaque_ports_adaptor.go +++ b/controller/api/destination/opaque_ports_adaptor.go @@ -32,10 +32,9 @@ func (opa *opaquePortsAdaptor) UpdateService(ports map[uint32]struct{}) { } func (opa *opaquePortsAdaptor) publish() { - merged := sp.ServiceProfile{} if opa.profile != nil { - merged = *opa.profile + p := *opa.profile + p.Spec.OpaquePorts = opa.opaquePorts + opa.listener.Update(&p) } - merged.Spec.OpaquePorts = opa.opaquePorts - opa.listener.Update(&merged) } diff --git a/controller/api/destination/profile_translator.go b/controller/api/destination/profile_translator.go index f2c7fcccd..1342dc433 100644 --- a/controller/api/destination/profile_translator.go +++ b/controller/api/destination/profile_translator.go @@ -35,11 +35,13 @@ func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.E func (pt *profileTranslator) Update(profile *sp.ServiceProfile) { if profile == nil { + pt.log.Debugf("Sending default profile") if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil { pt.log.Errorf("failed to send default service profile: %s", err) } return } + destinationProfile, err := pt.createDestinationProfile(profile) if err != nil { pt.log.Error(err) diff --git a/controller/api/destination/server.go b/controller/api/destination/server.go index 392fda8e5..5a979a119 100644 --- a/controller/api/destination/server.go +++ b/controller/api/destination/server.go @@ -11,6 +11,7 @@ import ( pb "github.com/linkerd/linkerd2-proxy-api/go/destination" "github.com/linkerd/linkerd2/controller/api/destination/watcher" + sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2" "github.com/linkerd/linkerd2/controller/k8s" labels "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/prometheus" @@ -229,11 +230,11 @@ func (s *server) getProfileByIP( if err != nil { return fmt.Errorf("failed to create address: %w", err) } - return s.translateEndpointProfile(&address, port, stream) + return s.subscribeToEndpointProfile(&address, port, stream) } fqn := fmt.Sprintf("%s.%s.svc.%s", svcID.Name, svcID.Namespace, s.clusterDomain) - return s.translateServiceProfile(*svcID, token, fqn, port, stream) + return s.subscribeToServiceProfile(*svcID, token, fqn, port, stream) } func (s *server) getProfileByName( @@ -255,13 +256,16 @@ func (s *server) getProfileByName( if err != nil { return fmt.Errorf("failed to get pod for hostname %s: %w", hostname, err) } - return s.translateEndpointProfile(address, port, stream) + return s.subscribeToEndpointProfile(address, port, stream) } - return s.translateServiceProfile(service, token, host, port, stream) + return s.subscribeToServiceProfile(service, token, host, port, stream) } -func (s *server) translateServiceProfile( +// Resolves a profile for a service, sending updates to the provided stream. +// +// This function does not return until the stream is closed. +func (s *server) subscribeToServiceProfile( service watcher.ID, token, fqn string, port uint32, @@ -272,6 +276,8 @@ func (s *server) translateServiceProfile( WithField("svc", service.Name). WithField("port", port) + canceled := stream.Context().Done() + // We build up the pipeline of profile updaters backwards, starting from // the translator which takes profile updates, translates them to protobuf // and pushes them onto the gRPC stream. @@ -282,7 +288,8 @@ func (s *server) translateServiceProfile( // split adaptor. opaquePortsAdaptor := newOpaquePortsAdaptor(translator) - // Subscribe the adaptor to service updates. + // Create an adaptor that merges service-level opaque port configurations + // onto profile updates. err := s.opaquePorts.Subscribe(service, opaquePortsAdaptor) if err != nil { log.Warnf("Failed to subscribe to service updates for %s: %s", service, err) @@ -290,51 +297,110 @@ func (s *server) translateServiceProfile( } defer s.opaquePorts.Unsubscribe(service, opaquePortsAdaptor) - // The fallback accepts updates from a primary and secondary source and - // passes the appropriate profile updates to the adaptor. - primary, secondary := newFallbackProfileListener(opaquePortsAdaptor) + // Ensure that (1) nil values are turned into a default policy and (2) + // subsequent updates that refer to same service profile object are + // deduplicated to prevent sending redundant updates. + dup := newDedupProfileListener(opaquePortsAdaptor, log) + defaultProfile := sp.ServiceProfile{} + listener := newDefaultProfileListener(&defaultProfile, dup, log) - // If we have a context token, we create two subscriptions: one with the - // context token which sends updates to the primary listener and one without - // the context token which sends updates to the secondary listener. It is - // up to the fallbackProfileListener to merge updates from the primary and - // secondary listeners and send the appropriate updates to the stream. - if token != "" { - ctxToken := s.parseContextToken(token) - profile, err := profileID(fqn, ctxToken, s.clusterDomain) - if err != nil { - log.Debug("Invalid service") - return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err) - } - err = s.profiles.Subscribe(profile, primary) - if err != nil { - log.Warnf("Failed to subscribe to profile: %s", err) - return err - } - defer s.profiles.Unsubscribe(profile, primary) + // The primary lookup uses the context token to determine the requester's + // namespace. If there's no namespace in the token, start a single + // subscription. + tok := s.parseContextToken(token) + if tok.Ns == "" { + return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log) } + return s.subscribeToServicesWithContext(fqn, tok, listener, canceled, log) +} - profile, err := profileID(fqn, contextToken{}, s.clusterDomain) +// subscribeToServiceWithContext establishes two profile watches: a "backup" +// watch (ignoring the client namespace) and a preferred "primary" watch +// assuming the client's context. Once updates are received for both watches, we +// select over both watches to send profile updates to the stream. A nil update +// may be sent if both the primary and backup watches are initialized with a nil +// value. +func (s *server) subscribeToServicesWithContext( + fqn string, + token contextToken, + listener watcher.ProfileUpdateListener, + canceled <-chan struct{}, + log *logging.Entry, +) error { + // We ned to support two subscriptions: + // - First, a backup subscription that assumes the context of the server + // namespace. + // - And then, a primary subscription that assumes the context of the client + // namespace. + primary, backup := newFallbackProfileListener(listener, log) + + // The backup lookup ignores the context token to lookup any + // server-namespace-hosted profiles. + backupID, err := profileID(fqn, contextToken{}, s.clusterDomain) if err != nil { log.Debug("Invalid service") return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err) } - err = s.profiles.Subscribe(profile, secondary) + err = s.profiles.Subscribe(backupID, backup) if err != nil { log.Warnf("Failed to subscribe to profile: %s", err) return err } - defer s.profiles.Unsubscribe(profile, secondary) + defer s.profiles.Unsubscribe(backupID, backup) + + primaryID, err := profileID(fqn, token, s.clusterDomain) + if err != nil { + log.Debug("Invalid service") + return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err) + } + err = s.profiles.Subscribe(primaryID, primary) + if err != nil { + log.Warnf("Failed to subscribe to profile: %s", err) + return err + } + defer s.profiles.Unsubscribe(primaryID, primary) select { case <-s.shutdown: - case <-stream.Context().Done(): + case <-canceled: log.Debug("Cancelled") } return nil } -func (s *server) translateEndpointProfile( +// subscribeToServiceWithoutContext establishes a single profile watch, assuming +// no client context. All udpates are published to the provided listener. +func (s *server) subscribeToServiceWithoutContext( + fqn string, + listener watcher.ProfileUpdateListener, + cancel <-chan struct{}, + log *logging.Entry, +) error { + id, err := profileID(fqn, contextToken{}, s.clusterDomain) + if err != nil { + log.Debug("Invalid service") + return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err) + } + err = s.profiles.Subscribe(id, listener) + if err != nil { + log.Warnf("Failed to subscribe to profile: %s", err) + return err + } + defer s.profiles.Unsubscribe(id, listener) + + select { + case <-s.shutdown: + case <-cancel: + log.Debug("Cancelled") + } + return nil +} + +// Resolves a profile for a single endpoitn, sending updates to the provided +// stream. +// +// This function does not return until the stream is closed. +func (s *server) subscribeToEndpointProfile( address *watcher.Address, port uint32, stream pb.Destination_GetProfileServer, @@ -355,7 +421,7 @@ func (s *server) translateEndpointProfile( if err != nil { return fmt.Errorf("failed to create endpoint: %w", err) } - translator := newEndpointProfileTranslator(address.Pod, port, endpoint, stream, s.log) + translator := newEndpointProfileTranslator(address.Pod, port, endpoint, stream, log) // If the endpoint's port is annotated as opaque, we don't need to // subscribe for updates because it will always be opaque @@ -613,6 +679,9 @@ type contextToken struct { func (s *server) parseContextToken(token string) contextToken { ctxToken := contextToken{} + if token == "" { + return ctxToken + } if err := json.Unmarshal([]byte(token), &ctxToken); err != nil { // if json is invalid, means token can have ns: form parts := strings.Split(token, ":") diff --git a/controller/api/destination/server_test.go b/controller/api/destination/server_test.go index ebc8cb03c..b9e2edb8e 100644 --- a/controller/api/destination/server_test.go +++ b/controller/api/destination/server_test.go @@ -84,7 +84,6 @@ func TestGet(t *testing.T) { updates: []*pb.Update{}, MockServerStream: util.NewMockServerStream(), } - stream.Cancel() path := fmt.Sprintf("%s:%d", fullyQualifiedNameSkipped, skippedPort) @@ -96,13 +95,8 @@ func TestGet(t *testing.T) { t.Fatalf("Got error: %s", err) } - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - last := stream.updates[len(stream.updates)-1] - - addrs := last.GetAdd().Addrs + update := assertSingleUpdate(t, stream.updates) + addrs := update.GetAdd().Addrs if len(addrs) == 0 { t.Fatalf("Expected len(addrs) to be > 0") } @@ -120,12 +114,10 @@ func TestGet(t *testing.T) { func TestGetProfiles(t *testing.T) { t.Run("Returns error if not valid service name", func(t *testing.T) { server := makeServer(t) - stream := &bufferingGetProfileStream{ updates: []*pb.DestinationProfile{}, MockServerStream: util.NewMockServerStream(), } - err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream) if err == nil { t.Fatalf("Expecting error, got nothing") @@ -133,169 +125,37 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Returns server profile", func(t *testing.T) { - server := makeServer(t) - - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), + stream := profileStream(t, fullyQualifiedName, port, "ns:other") + profile := assertSingleProfile(t, stream.updates) + if profile.FullyQualifiedName != fullyQualifiedName { + t.Fatalf("Expected fully qualified name '%s', but got '%s'", + fullyQualifiedName, profile.FullyQualifiedName) } - - stream.Cancel() // See note above on pre-emptive cancellation. - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port), - ContextToken: "ns:other", - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // The number of updates we get depends on the order that the watcher - // gets updates about the server profile and the client profile. The - // client profile takes priority so if we get that update first, it - // will only trigger one update to the stream. However, if the watcher - // gets the server profile first, it will send an update with that - // profile to the stream and then a second update when it gets the - // client profile. - // Additionally, under normal conditions the creation of resources by - // the fake API will generate notifications that are discarded after the - // stream.Cancel() call, but very rarely those notifications might come - // after, in which case we'll get a third update. - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - firstUpdate := stream.updates[0] - if firstUpdate.FullyQualifiedName != fullyQualifiedName { - t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, firstUpdate.FullyQualifiedName) - } - - lastUpdate := stream.updates[len(stream.updates)-1] - if lastUpdate.OpaqueProtocol { + if profile.OpaqueProtocol { t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port) } - routes := lastUpdate.GetRoutes() + routes := profile.GetRoutes() if len(routes) != 1 { - t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes) - } - if routes[0].GetIsRetryable() { - t.Fatalf("Expected route to not be retryable, but it was") + t.Fatalf("Expected 0 routes but got %d: %v", len(routes), routes) } }) t.Run("Return service profile when using json token", func(t *testing.T) { - server := makeServer(t) - - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), + stream := profileStream(t, fullyQualifiedName, port, `{"ns":"other"}`) + profile := assertSingleProfile(t, stream.updates) + if profile.FullyQualifiedName != fullyQualifiedName { + t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName) } - - stream.Cancel() // see note above on pre-emptive cancelling - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port), - ContextToken: "{\"ns\":\"other\"}", - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // The number of updates we get depends on the order that the watcher - // gets updates about the server profile and the client profile. The - // client profile takes priority so if we get that update first, it - // will only trigger one update to the stream. However, if the watcher - // gets the server profile first, it will send an update with that - // profile to the stream and then a second update when it gets the - // client profile. - // Additionally, under normal conditions the creation of resources by - // the fake API will generate notifications that are discarded after the - // stream.Cancel() call, but very rarely those notifications might come - // after, in which case we'll get a third update. - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got: %d: %v", len(stream.updates), stream.updates) - } - - firstUpdate := stream.updates[0] - if firstUpdate.FullyQualifiedName != fullyQualifiedName { - t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, firstUpdate.FullyQualifiedName) - } - - lastUpdate := stream.updates[len(stream.updates)-1] - routes := lastUpdate.GetRoutes() + routes := profile.GetRoutes() if len(routes) != 1 { t.Fatalf("Expected 1 route got %d: %v", len(routes), routes) } - if routes[0].GetIsRetryable() { - t.Fatalf("Expected route to not be retryable, but it was") - } }) t.Run("Returns client profile", func(t *testing.T) { - server := makeServer(t) - - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - - // See note about pre-emptive cancellation - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port), - ContextToken: "{\"ns\":\"client-ns\"}", - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // The number of updates we get depends on if the watcher gets an update - // about the profile before or after the subscription. If the subscription - // happens first, then we get a profile update during the subscription and - // then a second update when the watcher receives the update about that - // profile. If the watcher event happens first, then we only get the - // update during subscription. - if len(stream.updates) != 1 && len(stream.updates) != 2 { - t.Fatalf("Expected 1 or 2 updates but got %d: %v", len(stream.updates), stream.updates) - } - routes := stream.updates[len(stream.updates)-1].GetRoutes() - if len(routes) != 1 { - t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes) - } - if !routes[0].GetIsRetryable() { - t.Fatalf("Expected route to be retryable, but it was not") - } - }) - t.Run("Returns client profile", func(t *testing.T) { - server := makeServer(t) - - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - - // See note above on pre-emptive cancellation. - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port), - ContextToken: "ns:client-ns", - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // The number of updates we get depends on if the watcher gets an update - // about the profile before or after the subscription. If the subscription - // happens first, then we get a profile update during the subscription and - // then a second update when the watcher receives the update about that - // profile. If the watcher event happens first, then we only get the - // update during subscription. - if len(stream.updates) != 1 && len(stream.updates) != 2 { - t.Fatalf("Expected 1 or 2 updates but got %d: %v", len(stream.updates), stream.updates) - } - routes := stream.updates[len(stream.updates)-1].GetRoutes() + stream := profileStream(t, fullyQualifiedName, port, `{"ns":"client-ns"}`) + profile := assertSingleProfile(t, stream.updates) + routes := profile.GetRoutes() if len(routes) != 1 { t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes) } @@ -305,61 +165,28 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile when using cluster IP", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), + stream := profileStream(t, clusterIP, port, "") + profile := assertSingleProfile(t, stream.updates) + if profile.FullyQualifiedName != fullyQualifiedName { + t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName) } - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", clusterIP, port), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // An explanation for why we expect 1 to 3 updates is in test cases - // above - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - last := stream.updates[len(stream.updates)-1] - if last.FullyQualifiedName != fullyQualifiedName { - t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, last.FullyQualifiedName) - } - if last.OpaqueProtocol { + if profile.OpaqueProtocol { t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port) } - routes := last.GetRoutes() + routes := profile.GetRoutes() if len(routes) != 1 { t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes) } }) t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() + stream := profileStream(t, fullyQualifiedPodDNS, port, "ns:ns") epAddr, err := toAddress(podIPStatefulSet, port) if err != nil { t.Fatalf("Got error: %s", err) } - err = server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", fullyQualifiedPodDNS, port), - ContextToken: "ns:ns", - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - // An explanation for why we expect 1 to 3 updates is in test cases // above if len(stream.updates) == 0 || len(stream.updates) > 3 { @@ -389,26 +216,13 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() + stream := profileStream(t, podIP1, port, "ns:ns") epAddr, err := toAddress(podIP1, port) if err != nil { t.Fatalf("Got error: %s", err) } - err = server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", podIP1, port), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - // An explanation for why we expect 1 to 3 updates is in test cases // above if len(stream.updates) == 0 || len(stream.updates) > 3 { @@ -438,303 +252,126 @@ func TestGetProfiles(t *testing.T) { }) t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: "172.0.0.0:1234", - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // An explanation for why we expect 1 to 3 updates is in test cases - // above - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - first := stream.updates[0] - if first.RetryBudget == nil { + stream := profileStream(t, "172.0.0.0", 1234, "") + profile := assertSingleProfile(t, stream.updates) + if profile.RetryBudget == nil { t.Fatalf("Expected default profile to have a retry budget") } }) t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: podIP2, - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // An explanation for why we expect 1 to 3 updates is in test cases - // above - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - first := stream.updates[0] - if first.Endpoint == nil { + stream := profileStream(t, podIP2, port, "") + profile := assertSingleProfile(t, stream.updates) + if profile.Endpoint == nil { t.Fatalf("Expected response to have endpoint field") } - if first.Endpoint.GetProtocolHint().GetProtocol() != nil || first.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil { + if profile.Endpoint.GetProtocolHint().GetProtocol() != nil || profile.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil { t.Fatalf("Expected no protocol hint but found one") } }) t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), + stream := profileStream(t, clusterIPOpaque, opaquePort, "") + profile := assertSingleProfile(t, stream.updates) + if profile.FullyQualifiedName != fullyQualifiedNameOpaque { + t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, profile.FullyQualifiedName) } - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", clusterIPOpaque, opaquePort), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // An explanation for why we expect 1 to 3 updates is in test cases - // above - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - last := stream.updates[len(stream.updates)-1] - if last.FullyQualifiedName != fullyQualifiedNameOpaque { - t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, last.FullyQualifiedName) - } - if last.OpaqueProtocol { + if profile.OpaqueProtocol { t.Fatalf("Expected port %d to not be an opaque protocol, but it was", opaquePort) } }) t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() + stream := profileStream(t, podIPOpaque, opaquePort, "") epAddr, err := toAddress(podIPOpaque, opaquePort) if err != nil { t.Fatalf("Got error: %s", err) } - err = server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", podIPOpaque, opaquePort), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - // An explanation for why we expect 1 to 3 updates is in test cases // above if len(stream.updates) == 0 || len(stream.updates) > 3 { t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) } - first := stream.updates[0] - if first.Endpoint == nil { + profile := assertSingleProfile(t, stream.updates) + if profile.Endpoint == nil { t.Fatalf("Expected response to have endpoint field") } - if !first.OpaqueProtocol { + if !profile.OpaqueProtocol { t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort) } - _, exists := first.Endpoint.MetricLabels["namespace"] + _, exists := profile.Endpoint.MetricLabels["namespace"] if !exists { t.Fatalf("Expected 'namespace' metric label to exist but it did not") } - if first.Endpoint.ProtocolHint == nil { + if profile.Endpoint.ProtocolHint == nil { t.Fatalf("Expected protocol hint but found none") } - if first.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 { + if profile.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 { t.Fatalf("Expected pod to support opaque traffic on port 4143") } - if first.Endpoint.Addr.String() != epAddr.String() { - t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, first.Endpoint.Addr.Port) + if profile.Endpoint.Addr.String() != epAddr.String() { + t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port) } }) t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), + stream := profileStream(t, fullyQualifiedNameOpaqueService, opaquePort, "") + profile := assertSingleProfile(t, stream.updates) + if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService { + t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaqueService, profile.FullyQualifiedName) } - stream.Cancel() - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", fullyQualifiedNameOpaqueService, opaquePort), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - last := stream.updates[len(stream.updates)-1] - if last.FullyQualifiedName != fullyQualifiedNameOpaqueService { - t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaqueService, last.FullyQualifiedName) - } - if !last.OpaqueProtocol { + if !profile.OpaqueProtocol { t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort) } }) t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - - stream.Cancel() - - path := fmt.Sprintf("%s:%d", podIPSkipped, skippedPort) - err := server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: path, - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - if len(stream.updates) == 0 || len(stream.updates) > 3 { - t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates) - } - - last := stream.updates[len(stream.updates)-1] - - addr := last.GetEndpoint() + stream := profileStream(t, podIPSkipped, skippedPort, "") + profile := assertSingleProfile(t, stream.updates) + addr := profile.GetEndpoint() if addr == nil { t.Fatalf("Expected to not be nil") } - if addr.GetProtocolHint().GetProtocol() != nil || addr.GetProtocolHint().GetOpaqueTransport() != nil { - t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addr.ProtocolHint) + t.Fatalf("Expected protocol hint for %s to be nil but got %+v", podIPSkipped, addr.ProtocolHint) } - if addr.TlsIdentity != nil { - t.Fatalf("Expected TLS identity for %s to be nil but got %+v", path, addr.TlsIdentity) + t.Fatalf("Expected TLS identity for %s to be nil but got %+v", podIPSkipped, addr.TlsIdentity) } }) t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() - - _, err := toAddress(podIPPolicy, 80) - if err != nil { - t.Fatalf("Got error: %s", err) - } - err = server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", podIPPolicy, 80), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // Test that the first update has a destination profile with an - // opaque protocol and opaque transport. - if len(stream.updates) == 0 { - t.Fatalf("Expected at least 1 update but got 0") - } - update := stream.updates[0] - if update.Endpoint == nil { + stream := profileStream(t, podIPPolicy, 80, "") + profile := assertSingleProfile(t, stream.updates) + if profile.Endpoint == nil { t.Fatalf("Expected response to have endpoint field") } - if !update.OpaqueProtocol { + if !profile.OpaqueProtocol { t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80) } - if update.Endpoint.ProtocolHint == nil { + if profile.Endpoint.ProtocolHint == nil { t.Fatalf("Expected protocol hint but found none") } - if update.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 { + if profile.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 { t.Fatalf("Expected pod to support opaque traffic on port 4143") } }) t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() - - _, err := toAddress(externalIP, 3306) - if err != nil { - t.Fatalf("Got error: %s", err) - } - err = server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", externalIP, 3306), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // Test that the first update has a destination profile with an - // opaque protocol and opaque transport. - if len(stream.updates) == 0 { - t.Fatalf("Expected at least 1 update but got 0") - } - update := stream.updates[0] - if !update.OpaqueProtocol { + stream := profileStream(t, externalIP, 3306, "") + profile := assertSingleProfile(t, stream.updates) + if !profile.OpaqueProtocol { t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 3306) } }) t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) { - server := makeServer(t) - stream := &bufferingGetProfileStream{ - updates: []*pb.DestinationProfile{}, - MockServerStream: util.NewMockServerStream(), - } - stream.Cancel() - - _, err := toAddress(externalIP, 80) - if err != nil { - t.Fatalf("Got error: %s", err) - } - err = server.GetProfile(&pb.GetDestination{ - Scheme: "k8s", - Path: fmt.Sprintf("%s:%d", externalIP, 80), - }, stream) - if err != nil { - t.Fatalf("Got error: %s", err) - } - - // Test that the first update has a destination profile with an - // opaque protocol and opaque transport. - if len(stream.updates) == 0 { - t.Fatalf("Expected at least 1 update but got 0") - } - update := stream.updates[0] - if update.OpaqueProtocol { + stream := profileStream(t, externalIP, 80, "") + profile := assertSingleProfile(t, stream.updates) + if profile.OpaqueProtocol { t.Fatalf("Expected port %d to be a non-opaque protocol, but it was opaque", 80) } }) @@ -970,3 +607,55 @@ status: } }) } + +func assertSingleProfile(t *testing.T, updates []*pb.DestinationProfile) *pb.DestinationProfile { + t.Helper() + // Under normal conditions the creation of resources by the fake API will + // generate notifications that are discarded after the stream.Cancel() call, + // but very rarely those notifications might come after, in which case we'll + // get a second update. + if len(updates) != 1 { + t.Fatalf("Expected 1 profile update but got %d: %v", len(updates), updates) + } + return updates[0] +} + +func assertSingleUpdate(t *testing.T, updates []*pb.Update) *pb.Update { + t.Helper() + // Under normal conditions the creation of resources by the fake API will + // generate notifications that are discarded after the stream.Cancel() call, + // but very rarely those notifications might come after, in which case we'll + // get a second update. + if len(updates) == 0 || len(updates) > 2 { + t.Fatalf("Expected 1 or 2 updates but got %d: %v", len(updates), updates) + } + return updates[0] +} + +func profileStream(t *testing.T, host string, port uint32, token string) *bufferingGetProfileStream { + t.Helper() + + server := makeServer(t) + stream := &bufferingGetProfileStream{ + updates: []*pb.DestinationProfile{}, + MockServerStream: util.NewMockServerStream(), + } + + // We cancel the stream before even sending the request so that we don't + // need to call server.Get in a separate goroutine. By preemptively + // cancelling, the behavior of Get becomes effectively synchronous and + // we will get only the initial update, which is what we want for this + // test. + stream.Cancel() + + err := server.GetProfile(&pb.GetDestination{ + Scheme: "k8s", + Path: fmt.Sprintf("%s:%d", host, port), + ContextToken: token, + }, stream) + if err != nil { + t.Fatalf("Got error: %s", err) + } + + return stream +} diff --git a/controller/api/destination/test_util.go b/controller/api/destination/test_util.go index 19da908f0..d96975497 100644 --- a/controller/api/destination/test_util.go +++ b/controller/api/destination/test_util.go @@ -369,6 +369,7 @@ spec: t.Fatalf("NewFakeMetadataAPI returned an error: %s", err) } log := logging.WithField("test", t.Name()) + logging.SetLevel(logging.DebugLevel) defaultOpaquePorts := map[uint32]struct{}{ 25: {}, 443: {}, diff --git a/controller/api/destination/watcher/test_util.go b/controller/api/destination/watcher/test_util.go index 7c760b446..a23c3b009 100644 --- a/controller/api/destination/watcher/test_util.go +++ b/controller/api/destination/watcher/test_util.go @@ -50,6 +50,7 @@ func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile) { } func testCompare(t *testing.T, expected interface{}, actual interface{}) { + t.Helper() if diff := deep.Equal(expected, actual); diff != nil { t.Fatalf("%v", diff) }