mirror of https://github.com/linkerd/linkerd2.git
destination: Avoid sending spurious profile updates (#10517)
The GetProfile API endpoint does not behave as expected: when a profile watch is established, the API server starts two separate profile watches--a primary watch with the client's namespace and fallback watch ignoring the client's namespace. These watches race to send data back to the client. If the backup watch updates first, it may be sent to clients before being corrected by a subsequent update. If the primary watch updates with an empty value, the default profile may be served before being corrected by an update to the backup watch. From the proxy's perspective, we'd much prefer that the API provide a single authoritative response when possible. It avoids needless corrective work from distributing across the system on every watch initiation. To fix this, we modify the fallbackProfileListener to behave predictably: it only emits updates once both its primary and fallback listeners have been updated. This avoids emitting updates based on a partial understanding of the cluster state. Furthermore, the opaquePortsAdaptor is updated to avoid synthesizing a default serviceprofile (surprising behavior) and, instead, this defaulting logic is moved into a dedicated defaultProfileListener helper. A dedupProfileListener is added to squelch obviously redundant updates. Finally, this newfound predictability allows us to simplify the API's tests. Many of the API tests are not clear in what they test and sometimes make assertions about the "incorrect" profile updates.
This commit is contained in:
parent
e03687d34a
commit
08f97cc26f
|
|
@ -0,0 +1,31 @@
|
|||
package destination
|
||||
|
||||
import (
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type dedupProfileListener struct {
|
||||
parent watcher.ProfileUpdateListener
|
||||
state *sp.ServiceProfile
|
||||
initialized bool
|
||||
log *log.Entry
|
||||
}
|
||||
|
||||
func newDedupProfileListener(
|
||||
parent watcher.ProfileUpdateListener,
|
||||
log *log.Entry,
|
||||
) watcher.ProfileUpdateListener {
|
||||
return &dedupProfileListener{parent, nil, false, log}
|
||||
}
|
||||
|
||||
func (p *dedupProfileListener) Update(profile *sp.ServiceProfile) {
|
||||
if p.initialized && profile == p.state {
|
||||
log.Debug("Skipping redundant update")
|
||||
return
|
||||
}
|
||||
p.parent.Update(profile)
|
||||
p.initialized = true
|
||||
p.state = profile
|
||||
}
|
||||
|
|
@ -0,0 +1,29 @@
|
|||
package destination
|
||||
|
||||
import (
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type defaultProfileListener struct {
|
||||
parent watcher.ProfileUpdateListener
|
||||
profile *sp.ServiceProfile
|
||||
log *log.Entry
|
||||
}
|
||||
|
||||
func newDefaultProfileListener(
|
||||
profile *sp.ServiceProfile,
|
||||
parent watcher.ProfileUpdateListener,
|
||||
log *log.Entry,
|
||||
) watcher.ProfileUpdateListener {
|
||||
return &defaultProfileListener{parent, profile, log}
|
||||
}
|
||||
|
||||
func (p *defaultProfileListener) Update(profile *sp.ServiceProfile) {
|
||||
if profile == nil {
|
||||
log.Debug("Using default profile")
|
||||
profile = p.profile
|
||||
}
|
||||
p.parent.Update(profile)
|
||||
}
|
||||
|
|
@ -5,99 +5,90 @@ import (
|
|||
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
type fallbackProfileListener struct {
|
||||
underlying watcher.ProfileUpdateListener
|
||||
primary *primaryProfileListener
|
||||
backup *backupProfileListener
|
||||
mutex sync.Mutex
|
||||
primary, backup *childListener
|
||||
parent watcher.ProfileUpdateListener
|
||||
log *logging.Entry
|
||||
mutex sync.Mutex
|
||||
}
|
||||
|
||||
type fallbackChildListener struct {
|
||||
type childListener struct {
|
||||
// state is only referenced from the outer struct primaryProfileListener
|
||||
// or backupProfileListener (e.g. listener.state where listener's type is
|
||||
// _not_ this struct). structcheck issues a false positive for this field
|
||||
// as it does not think it's used.
|
||||
//nolint:structcheck
|
||||
state *sp.ServiceProfile
|
||||
parent *fallbackProfileListener
|
||||
state *sp.ServiceProfile
|
||||
initialized bool
|
||||
parent *fallbackProfileListener
|
||||
}
|
||||
|
||||
type primaryProfileListener struct {
|
||||
fallbackChildListener
|
||||
}
|
||||
|
||||
type backupProfileListener struct {
|
||||
fallbackChildListener
|
||||
}
|
||||
|
||||
// newFallbackProfileListener takes an underlying profileUpdateListener and
|
||||
// returns two profileUpdateListeners: a primary and a backup. Updates to
|
||||
// the primary and backup will propagate to the underlying with updates to
|
||||
// the primary always taking priority. If the value in the primary is cleared,
|
||||
// the value from the backup is used.
|
||||
func newFallbackProfileListener(listener watcher.ProfileUpdateListener) (watcher.ProfileUpdateListener, watcher.ProfileUpdateListener) {
|
||||
// newFallbackProfileListener takes a parent ProfileUpdateListener and returns
|
||||
// two ProfileUpdateListeners: a primary and a backup.
|
||||
//
|
||||
// If the primary listener is updated with a non-nil value, it is published to
|
||||
// the parent listener.
|
||||
//
|
||||
// Otherwise, if the backup listener has most recently been updated with a
|
||||
// non-nil value, its valeu is published to the parent listener.
|
||||
//
|
||||
// A nil ServiceProfile is published only when both the primary and backup have
|
||||
// been initialized and have nil values.
|
||||
func newFallbackProfileListener(
|
||||
parent watcher.ProfileUpdateListener,
|
||||
log *logging.Entry,
|
||||
) (watcher.ProfileUpdateListener, watcher.ProfileUpdateListener) {
|
||||
// Primary and backup share a lock to ensure updates are atomic.
|
||||
fallback := fallbackProfileListener{
|
||||
mutex: sync.Mutex{},
|
||||
underlying: listener,
|
||||
mutex: sync.Mutex{},
|
||||
log: log,
|
||||
}
|
||||
|
||||
primary := primaryProfileListener{
|
||||
fallbackChildListener{
|
||||
parent: &fallback,
|
||||
},
|
||||
primary := childListener{
|
||||
initialized: false,
|
||||
parent: &fallback,
|
||||
}
|
||||
backup := backupProfileListener{
|
||||
fallbackChildListener{
|
||||
parent: &fallback,
|
||||
},
|
||||
|
||||
backup := childListener{
|
||||
initialized: false,
|
||||
parent: &fallback,
|
||||
}
|
||||
|
||||
fallback.parent = parent
|
||||
fallback.primary = &primary
|
||||
fallback.backup = &backup
|
||||
|
||||
return &primary, &backup
|
||||
}
|
||||
|
||||
// Primary
|
||||
func (f *fallbackProfileListener) publish() {
|
||||
if !f.primary.initialized {
|
||||
f.log.Debug("Waiting for primary profile listener to be initialized")
|
||||
return
|
||||
}
|
||||
if !f.backup.initialized {
|
||||
f.log.Debug("Waiting for backup profile listener to be initialized")
|
||||
return
|
||||
}
|
||||
|
||||
func (p *primaryProfileListener) Update(profile *sp.ServiceProfile) {
|
||||
if f.primary.state == nil && f.backup.state != nil {
|
||||
f.log.Debug("Publishing backup profile")
|
||||
f.parent.Update(f.backup.state)
|
||||
return
|
||||
}
|
||||
|
||||
f.log.Debug("Publishing primary profile")
|
||||
f.parent.Update(f.primary.state)
|
||||
}
|
||||
|
||||
func (p *childListener) Update(profile *sp.ServiceProfile) {
|
||||
p.parent.mutex.Lock()
|
||||
defer p.parent.mutex.Unlock()
|
||||
|
||||
p.state = profile
|
||||
|
||||
if p.state != nil {
|
||||
// We got a value; apply the update.
|
||||
p.parent.underlying.Update(p.state)
|
||||
return
|
||||
}
|
||||
if p.parent.backup != nil {
|
||||
// Our value was cleared; fall back to backup.
|
||||
p.parent.underlying.Update(p.parent.backup.state)
|
||||
return
|
||||
}
|
||||
// Our value was cleared and there is no backup value.
|
||||
p.parent.underlying.Update(nil)
|
||||
}
|
||||
|
||||
// Backup
|
||||
|
||||
func (b *backupProfileListener) Update(profile *sp.ServiceProfile) {
|
||||
b.parent.mutex.Lock()
|
||||
defer b.parent.mutex.Unlock()
|
||||
|
||||
b.state = profile
|
||||
|
||||
if b.parent.primary != nil && b.parent.primary.state != nil {
|
||||
// Primary has a value, so ignore this update.
|
||||
return
|
||||
}
|
||||
if b.state != nil {
|
||||
// We got a value; apply the update.
|
||||
b.parent.underlying.Update(b.state)
|
||||
return
|
||||
}
|
||||
// Our value was cleared and there is no primary value.
|
||||
b.parent.underlying.Update(nil)
|
||||
p.initialized = true
|
||||
p.parent.publish()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ import (
|
|||
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
logging "github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
|
|
@ -41,65 +42,56 @@ func TestFallbackProfileListener(t *testing.T) {
|
|||
}
|
||||
|
||||
t.Run("Primary updated", func(t *testing.T) {
|
||||
primary, _, listener := newListeners()
|
||||
|
||||
primary, backup, listener := newListeners()
|
||||
primary.Update(&primaryProfile)
|
||||
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{})
|
||||
backup.Update(nil)
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile})
|
||||
})
|
||||
|
||||
t.Run("Backup updated", func(t *testing.T) {
|
||||
_, backup, listener := newListeners()
|
||||
|
||||
primary, backup, listener := newListeners()
|
||||
backup.Update(&backupProfile)
|
||||
|
||||
primary.Update(nil)
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&backupProfile})
|
||||
})
|
||||
|
||||
t.Run("Primary cleared", func(t *testing.T) {
|
||||
primary, _, listener := newListeners()
|
||||
|
||||
primary, backup, listener := newListeners()
|
||||
backup.Update(nil)
|
||||
primary.Update(&primaryProfile)
|
||||
primary.Update(nil)
|
||||
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile, nil})
|
||||
})
|
||||
|
||||
t.Run("Backup cleared", func(t *testing.T) {
|
||||
_, backup, listener := newListeners()
|
||||
|
||||
primary, backup, listener := newListeners()
|
||||
backup.Update(&backupProfile)
|
||||
primary.Update(nil)
|
||||
backup.Update(nil)
|
||||
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&backupProfile, nil})
|
||||
})
|
||||
|
||||
t.Run("Primary overrides backup", func(t *testing.T) {
|
||||
primary, backup, listener := newListeners()
|
||||
|
||||
backup.Update(&backupProfile)
|
||||
primary.Update(&primaryProfile)
|
||||
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&backupProfile, &primaryProfile})
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile})
|
||||
})
|
||||
|
||||
t.Run("Backup update ignored", func(t *testing.T) {
|
||||
primary, backup, listener := newListeners()
|
||||
|
||||
primary.Update(&primaryProfile)
|
||||
backup.Update(&backupProfile)
|
||||
backup.Update(nil)
|
||||
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile})
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile, &primaryProfile})
|
||||
})
|
||||
|
||||
t.Run("Fallback to backup", func(t *testing.T) {
|
||||
primary, backup, listener := newListeners()
|
||||
|
||||
primary.Update(&primaryProfile)
|
||||
backup.Update(&backupProfile)
|
||||
primary.Update(nil)
|
||||
|
||||
assertEq(t, listener.received, []*sp.ServiceProfile{&primaryProfile, &backupProfile})
|
||||
})
|
||||
|
||||
|
|
@ -110,11 +102,12 @@ func newListeners() (watcher.ProfileUpdateListener, watcher.ProfileUpdateListene
|
|||
received: []*sp.ServiceProfile{},
|
||||
}
|
||||
|
||||
primary, backup := newFallbackProfileListener(listener)
|
||||
primary, backup := newFallbackProfileListener(listener, logging.NewEntry(logging.New()))
|
||||
return primary, backup, listener
|
||||
}
|
||||
|
||||
func assertEq(t *testing.T, received []*sp.ServiceProfile, expected []*sp.ServiceProfile) {
|
||||
t.Helper()
|
||||
if len(received) != len(expected) {
|
||||
t.Fatalf("Expected %d profile updates, got %d", len(expected), len(received))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -32,10 +32,9 @@ func (opa *opaquePortsAdaptor) UpdateService(ports map[uint32]struct{}) {
|
|||
}
|
||||
|
||||
func (opa *opaquePortsAdaptor) publish() {
|
||||
merged := sp.ServiceProfile{}
|
||||
if opa.profile != nil {
|
||||
merged = *opa.profile
|
||||
p := *opa.profile
|
||||
p.Spec.OpaquePorts = opa.opaquePorts
|
||||
opa.listener.Update(&p)
|
||||
}
|
||||
merged.Spec.OpaquePorts = opa.opaquePorts
|
||||
opa.listener.Update(&merged)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -35,11 +35,13 @@ func newProfileTranslator(stream pb.Destination_GetProfileServer, log *logging.E
|
|||
|
||||
func (pt *profileTranslator) Update(profile *sp.ServiceProfile) {
|
||||
if profile == nil {
|
||||
pt.log.Debugf("Sending default profile")
|
||||
if err := pt.stream.Send(pt.defaultServiceProfile()); err != nil {
|
||||
pt.log.Errorf("failed to send default service profile: %s", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
destinationProfile, err := pt.createDestinationProfile(profile)
|
||||
if err != nil {
|
||||
pt.log.Error(err)
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import (
|
|||
|
||||
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
|
||||
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
"github.com/linkerd/linkerd2/controller/k8s"
|
||||
labels "github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/prometheus"
|
||||
|
|
@ -229,11 +230,11 @@ func (s *server) getProfileByIP(
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to create address: %w", err)
|
||||
}
|
||||
return s.translateEndpointProfile(&address, port, stream)
|
||||
return s.subscribeToEndpointProfile(&address, port, stream)
|
||||
}
|
||||
|
||||
fqn := fmt.Sprintf("%s.%s.svc.%s", svcID.Name, svcID.Namespace, s.clusterDomain)
|
||||
return s.translateServiceProfile(*svcID, token, fqn, port, stream)
|
||||
return s.subscribeToServiceProfile(*svcID, token, fqn, port, stream)
|
||||
}
|
||||
|
||||
func (s *server) getProfileByName(
|
||||
|
|
@ -255,13 +256,16 @@ func (s *server) getProfileByName(
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to get pod for hostname %s: %w", hostname, err)
|
||||
}
|
||||
return s.translateEndpointProfile(address, port, stream)
|
||||
return s.subscribeToEndpointProfile(address, port, stream)
|
||||
}
|
||||
|
||||
return s.translateServiceProfile(service, token, host, port, stream)
|
||||
return s.subscribeToServiceProfile(service, token, host, port, stream)
|
||||
}
|
||||
|
||||
func (s *server) translateServiceProfile(
|
||||
// Resolves a profile for a service, sending updates to the provided stream.
|
||||
//
|
||||
// This function does not return until the stream is closed.
|
||||
func (s *server) subscribeToServiceProfile(
|
||||
service watcher.ID,
|
||||
token, fqn string,
|
||||
port uint32,
|
||||
|
|
@ -272,6 +276,8 @@ func (s *server) translateServiceProfile(
|
|||
WithField("svc", service.Name).
|
||||
WithField("port", port)
|
||||
|
||||
canceled := stream.Context().Done()
|
||||
|
||||
// We build up the pipeline of profile updaters backwards, starting from
|
||||
// the translator which takes profile updates, translates them to protobuf
|
||||
// and pushes them onto the gRPC stream.
|
||||
|
|
@ -282,7 +288,8 @@ func (s *server) translateServiceProfile(
|
|||
// split adaptor.
|
||||
opaquePortsAdaptor := newOpaquePortsAdaptor(translator)
|
||||
|
||||
// Subscribe the adaptor to service updates.
|
||||
// Create an adaptor that merges service-level opaque port configurations
|
||||
// onto profile updates.
|
||||
err := s.opaquePorts.Subscribe(service, opaquePortsAdaptor)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to subscribe to service updates for %s: %s", service, err)
|
||||
|
|
@ -290,51 +297,110 @@ func (s *server) translateServiceProfile(
|
|||
}
|
||||
defer s.opaquePorts.Unsubscribe(service, opaquePortsAdaptor)
|
||||
|
||||
// The fallback accepts updates from a primary and secondary source and
|
||||
// passes the appropriate profile updates to the adaptor.
|
||||
primary, secondary := newFallbackProfileListener(opaquePortsAdaptor)
|
||||
// Ensure that (1) nil values are turned into a default policy and (2)
|
||||
// subsequent updates that refer to same service profile object are
|
||||
// deduplicated to prevent sending redundant updates.
|
||||
dup := newDedupProfileListener(opaquePortsAdaptor, log)
|
||||
defaultProfile := sp.ServiceProfile{}
|
||||
listener := newDefaultProfileListener(&defaultProfile, dup, log)
|
||||
|
||||
// If we have a context token, we create two subscriptions: one with the
|
||||
// context token which sends updates to the primary listener and one without
|
||||
// the context token which sends updates to the secondary listener. It is
|
||||
// up to the fallbackProfileListener to merge updates from the primary and
|
||||
// secondary listeners and send the appropriate updates to the stream.
|
||||
if token != "" {
|
||||
ctxToken := s.parseContextToken(token)
|
||||
profile, err := profileID(fqn, ctxToken, s.clusterDomain)
|
||||
if err != nil {
|
||||
log.Debug("Invalid service")
|
||||
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
|
||||
}
|
||||
err = s.profiles.Subscribe(profile, primary)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to subscribe to profile: %s", err)
|
||||
return err
|
||||
}
|
||||
defer s.profiles.Unsubscribe(profile, primary)
|
||||
// The primary lookup uses the context token to determine the requester's
|
||||
// namespace. If there's no namespace in the token, start a single
|
||||
// subscription.
|
||||
tok := s.parseContextToken(token)
|
||||
if tok.Ns == "" {
|
||||
return s.subscribeToServiceWithoutContext(fqn, listener, canceled, log)
|
||||
}
|
||||
return s.subscribeToServicesWithContext(fqn, tok, listener, canceled, log)
|
||||
}
|
||||
|
||||
profile, err := profileID(fqn, contextToken{}, s.clusterDomain)
|
||||
// subscribeToServiceWithContext establishes two profile watches: a "backup"
|
||||
// watch (ignoring the client namespace) and a preferred "primary" watch
|
||||
// assuming the client's context. Once updates are received for both watches, we
|
||||
// select over both watches to send profile updates to the stream. A nil update
|
||||
// may be sent if both the primary and backup watches are initialized with a nil
|
||||
// value.
|
||||
func (s *server) subscribeToServicesWithContext(
|
||||
fqn string,
|
||||
token contextToken,
|
||||
listener watcher.ProfileUpdateListener,
|
||||
canceled <-chan struct{},
|
||||
log *logging.Entry,
|
||||
) error {
|
||||
// We ned to support two subscriptions:
|
||||
// - First, a backup subscription that assumes the context of the server
|
||||
// namespace.
|
||||
// - And then, a primary subscription that assumes the context of the client
|
||||
// namespace.
|
||||
primary, backup := newFallbackProfileListener(listener, log)
|
||||
|
||||
// The backup lookup ignores the context token to lookup any
|
||||
// server-namespace-hosted profiles.
|
||||
backupID, err := profileID(fqn, contextToken{}, s.clusterDomain)
|
||||
if err != nil {
|
||||
log.Debug("Invalid service")
|
||||
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
|
||||
}
|
||||
err = s.profiles.Subscribe(profile, secondary)
|
||||
err = s.profiles.Subscribe(backupID, backup)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to subscribe to profile: %s", err)
|
||||
return err
|
||||
}
|
||||
defer s.profiles.Unsubscribe(profile, secondary)
|
||||
defer s.profiles.Unsubscribe(backupID, backup)
|
||||
|
||||
primaryID, err := profileID(fqn, token, s.clusterDomain)
|
||||
if err != nil {
|
||||
log.Debug("Invalid service")
|
||||
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
|
||||
}
|
||||
err = s.profiles.Subscribe(primaryID, primary)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to subscribe to profile: %s", err)
|
||||
return err
|
||||
}
|
||||
defer s.profiles.Unsubscribe(primaryID, primary)
|
||||
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
case <-stream.Context().Done():
|
||||
case <-canceled:
|
||||
log.Debug("Cancelled")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *server) translateEndpointProfile(
|
||||
// subscribeToServiceWithoutContext establishes a single profile watch, assuming
|
||||
// no client context. All udpates are published to the provided listener.
|
||||
func (s *server) subscribeToServiceWithoutContext(
|
||||
fqn string,
|
||||
listener watcher.ProfileUpdateListener,
|
||||
cancel <-chan struct{},
|
||||
log *logging.Entry,
|
||||
) error {
|
||||
id, err := profileID(fqn, contextToken{}, s.clusterDomain)
|
||||
if err != nil {
|
||||
log.Debug("Invalid service")
|
||||
return status.Errorf(codes.InvalidArgument, "invalid profile ID: %s", err)
|
||||
}
|
||||
err = s.profiles.Subscribe(id, listener)
|
||||
if err != nil {
|
||||
log.Warnf("Failed to subscribe to profile: %s", err)
|
||||
return err
|
||||
}
|
||||
defer s.profiles.Unsubscribe(id, listener)
|
||||
|
||||
select {
|
||||
case <-s.shutdown:
|
||||
case <-cancel:
|
||||
log.Debug("Cancelled")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Resolves a profile for a single endpoitn, sending updates to the provided
|
||||
// stream.
|
||||
//
|
||||
// This function does not return until the stream is closed.
|
||||
func (s *server) subscribeToEndpointProfile(
|
||||
address *watcher.Address,
|
||||
port uint32,
|
||||
stream pb.Destination_GetProfileServer,
|
||||
|
|
@ -355,7 +421,7 @@ func (s *server) translateEndpointProfile(
|
|||
if err != nil {
|
||||
return fmt.Errorf("failed to create endpoint: %w", err)
|
||||
}
|
||||
translator := newEndpointProfileTranslator(address.Pod, port, endpoint, stream, s.log)
|
||||
translator := newEndpointProfileTranslator(address.Pod, port, endpoint, stream, log)
|
||||
|
||||
// If the endpoint's port is annotated as opaque, we don't need to
|
||||
// subscribe for updates because it will always be opaque
|
||||
|
|
@ -613,6 +679,9 @@ type contextToken struct {
|
|||
|
||||
func (s *server) parseContextToken(token string) contextToken {
|
||||
ctxToken := contextToken{}
|
||||
if token == "" {
|
||||
return ctxToken
|
||||
}
|
||||
if err := json.Unmarshal([]byte(token), &ctxToken); err != nil {
|
||||
// if json is invalid, means token can have ns:<namespace> form
|
||||
parts := strings.Split(token, ":")
|
||||
|
|
|
|||
|
|
@ -84,7 +84,6 @@ func TestGet(t *testing.T) {
|
|||
updates: []*pb.Update{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
|
||||
stream.Cancel()
|
||||
|
||||
path := fmt.Sprintf("%s:%d", fullyQualifiedNameSkipped, skippedPort)
|
||||
|
|
@ -96,13 +95,8 @@ func TestGet(t *testing.T) {
|
|||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
last := stream.updates[len(stream.updates)-1]
|
||||
|
||||
addrs := last.GetAdd().Addrs
|
||||
update := assertSingleUpdate(t, stream.updates)
|
||||
addrs := update.GetAdd().Addrs
|
||||
if len(addrs) == 0 {
|
||||
t.Fatalf("Expected len(addrs) to be > 0")
|
||||
}
|
||||
|
|
@ -120,12 +114,10 @@ func TestGet(t *testing.T) {
|
|||
func TestGetProfiles(t *testing.T) {
|
||||
t.Run("Returns error if not valid service name", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
|
||||
err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
|
||||
if err == nil {
|
||||
t.Fatalf("Expecting error, got nothing")
|
||||
|
|
@ -133,169 +125,37 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Returns server profile", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
stream := profileStream(t, fullyQualifiedName, port, "ns:other")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.FullyQualifiedName != fullyQualifiedName {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'",
|
||||
fullyQualifiedName, profile.FullyQualifiedName)
|
||||
}
|
||||
|
||||
stream.Cancel() // See note above on pre-emptive cancellation.
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port),
|
||||
ContextToken: "ns:other",
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// The number of updates we get depends on the order that the watcher
|
||||
// gets updates about the server profile and the client profile. The
|
||||
// client profile takes priority so if we get that update first, it
|
||||
// will only trigger one update to the stream. However, if the watcher
|
||||
// gets the server profile first, it will send an update with that
|
||||
// profile to the stream and then a second update when it gets the
|
||||
// client profile.
|
||||
// Additionally, under normal conditions the creation of resources by
|
||||
// the fake API will generate notifications that are discarded after the
|
||||
// stream.Cancel() call, but very rarely those notifications might come
|
||||
// after, in which case we'll get a third update.
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
firstUpdate := stream.updates[0]
|
||||
if firstUpdate.FullyQualifiedName != fullyQualifiedName {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, firstUpdate.FullyQualifiedName)
|
||||
}
|
||||
|
||||
lastUpdate := stream.updates[len(stream.updates)-1]
|
||||
if lastUpdate.OpaqueProtocol {
|
||||
if profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
|
||||
}
|
||||
routes := lastUpdate.GetRoutes()
|
||||
routes := profile.GetRoutes()
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
|
||||
}
|
||||
if routes[0].GetIsRetryable() {
|
||||
t.Fatalf("Expected route to not be retryable, but it was")
|
||||
t.Fatalf("Expected 0 routes but got %d: %v", len(routes), routes)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return service profile when using json token", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
stream := profileStream(t, fullyQualifiedName, port, `{"ns":"other"}`)
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.FullyQualifiedName != fullyQualifiedName {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
|
||||
}
|
||||
|
||||
stream.Cancel() // see note above on pre-emptive cancelling
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port),
|
||||
ContextToken: "{\"ns\":\"other\"}",
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// The number of updates we get depends on the order that the watcher
|
||||
// gets updates about the server profile and the client profile. The
|
||||
// client profile takes priority so if we get that update first, it
|
||||
// will only trigger one update to the stream. However, if the watcher
|
||||
// gets the server profile first, it will send an update with that
|
||||
// profile to the stream and then a second update when it gets the
|
||||
// client profile.
|
||||
// Additionally, under normal conditions the creation of resources by
|
||||
// the fake API will generate notifications that are discarded after the
|
||||
// stream.Cancel() call, but very rarely those notifications might come
|
||||
// after, in which case we'll get a third update.
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got: %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
firstUpdate := stream.updates[0]
|
||||
if firstUpdate.FullyQualifiedName != fullyQualifiedName {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, firstUpdate.FullyQualifiedName)
|
||||
}
|
||||
|
||||
lastUpdate := stream.updates[len(stream.updates)-1]
|
||||
routes := lastUpdate.GetRoutes()
|
||||
routes := profile.GetRoutes()
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("Expected 1 route got %d: %v", len(routes), routes)
|
||||
}
|
||||
if routes[0].GetIsRetryable() {
|
||||
t.Fatalf("Expected route to not be retryable, but it was")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Returns client profile", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
|
||||
// See note about pre-emptive cancellation
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port),
|
||||
ContextToken: "{\"ns\":\"client-ns\"}",
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// The number of updates we get depends on if the watcher gets an update
|
||||
// about the profile before or after the subscription. If the subscription
|
||||
// happens first, then we get a profile update during the subscription and
|
||||
// then a second update when the watcher receives the update about that
|
||||
// profile. If the watcher event happens first, then we only get the
|
||||
// update during subscription.
|
||||
if len(stream.updates) != 1 && len(stream.updates) != 2 {
|
||||
t.Fatalf("Expected 1 or 2 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
routes := stream.updates[len(stream.updates)-1].GetRoutes()
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
|
||||
}
|
||||
if !routes[0].GetIsRetryable() {
|
||||
t.Fatalf("Expected route to be retryable, but it was not")
|
||||
}
|
||||
})
|
||||
t.Run("Returns client profile", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
|
||||
// See note above on pre-emptive cancellation.
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", fullyQualifiedName, port),
|
||||
ContextToken: "ns:client-ns",
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// The number of updates we get depends on if the watcher gets an update
|
||||
// about the profile before or after the subscription. If the subscription
|
||||
// happens first, then we get a profile update during the subscription and
|
||||
// then a second update when the watcher receives the update about that
|
||||
// profile. If the watcher event happens first, then we only get the
|
||||
// update during subscription.
|
||||
if len(stream.updates) != 1 && len(stream.updates) != 2 {
|
||||
t.Fatalf("Expected 1 or 2 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
routes := stream.updates[len(stream.updates)-1].GetRoutes()
|
||||
stream := profileStream(t, fullyQualifiedName, port, `{"ns":"client-ns"}`)
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
routes := profile.GetRoutes()
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
|
||||
}
|
||||
|
|
@ -305,61 +165,28 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile when using cluster IP", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
stream := profileStream(t, clusterIP, port, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.FullyQualifiedName != fullyQualifiedName {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, profile.FullyQualifiedName)
|
||||
}
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", clusterIP, port),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
last := stream.updates[len(stream.updates)-1]
|
||||
if last.FullyQualifiedName != fullyQualifiedName {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedName, last.FullyQualifiedName)
|
||||
}
|
||||
if last.OpaqueProtocol {
|
||||
if profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to not be an opaque protocol, but it was", port)
|
||||
}
|
||||
routes := last.GetRoutes()
|
||||
routes := profile.GetRoutes()
|
||||
if len(routes) != 1 {
|
||||
t.Fatalf("Expected 1 route but got %d: %v", len(routes), routes)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with endpoint when using pod DNS", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
stream := profileStream(t, fullyQualifiedPodDNS, port, "ns:ns")
|
||||
|
||||
epAddr, err := toAddress(podIPStatefulSet, port)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
err = server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", fullyQualifiedPodDNS, port),
|
||||
ContextToken: "ns:ns",
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
|
|
@ -389,26 +216,13 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return profile with endpoint when using pod IP", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
stream := profileStream(t, podIP1, port, "ns:ns")
|
||||
|
||||
epAddr, err := toAddress(podIP1, port)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
err = server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", podIP1, port),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
|
|
@ -438,303 +252,126 @@ func TestGetProfiles(t *testing.T) {
|
|||
})
|
||||
|
||||
t.Run("Return default profile when IP does not map to service or pod", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: "172.0.0.0:1234",
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
first := stream.updates[0]
|
||||
if first.RetryBudget == nil {
|
||||
stream := profileStream(t, "172.0.0.0", 1234, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.RetryBudget == nil {
|
||||
t.Fatalf("Expected default profile to have a retry budget")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with no protocol hint when pod does not have label", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: podIP2,
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
first := stream.updates[0]
|
||||
if first.Endpoint == nil {
|
||||
stream := profileStream(t, podIP2, port, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.Endpoint == nil {
|
||||
t.Fatalf("Expected response to have endpoint field")
|
||||
}
|
||||
if first.Endpoint.GetProtocolHint().GetProtocol() != nil || first.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil {
|
||||
if profile.Endpoint.GetProtocolHint().GetProtocol() != nil || profile.Endpoint.GetProtocolHint().GetOpaqueTransport() != nil {
|
||||
t.Fatalf("Expected no protocol hint but found one")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return non-opaque protocol profile when using cluster IP and opaque protocol port", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
stream := profileStream(t, clusterIPOpaque, opaquePort, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.FullyQualifiedName != fullyQualifiedNameOpaque {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, profile.FullyQualifiedName)
|
||||
}
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", clusterIPOpaque, opaquePort),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
last := stream.updates[len(stream.updates)-1]
|
||||
if last.FullyQualifiedName != fullyQualifiedNameOpaque {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaque, last.FullyQualifiedName)
|
||||
}
|
||||
if last.OpaqueProtocol {
|
||||
if profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to not be an opaque protocol, but it was", opaquePort)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return opaque protocol profile with endpoint when using pod IP and opaque protocol port", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
stream := profileStream(t, podIPOpaque, opaquePort, "")
|
||||
|
||||
epAddr, err := toAddress(podIPOpaque, opaquePort)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
err = server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", podIPOpaque, opaquePort),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// An explanation for why we expect 1 to 3 updates is in test cases
|
||||
// above
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
first := stream.updates[0]
|
||||
if first.Endpoint == nil {
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.Endpoint == nil {
|
||||
t.Fatalf("Expected response to have endpoint field")
|
||||
}
|
||||
if !first.OpaqueProtocol {
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
|
||||
}
|
||||
_, exists := first.Endpoint.MetricLabels["namespace"]
|
||||
_, exists := profile.Endpoint.MetricLabels["namespace"]
|
||||
if !exists {
|
||||
t.Fatalf("Expected 'namespace' metric label to exist but it did not")
|
||||
}
|
||||
if first.Endpoint.ProtocolHint == nil {
|
||||
if profile.Endpoint.ProtocolHint == nil {
|
||||
t.Fatalf("Expected protocol hint but found none")
|
||||
}
|
||||
if first.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 {
|
||||
if profile.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 {
|
||||
t.Fatalf("Expected pod to support opaque traffic on port 4143")
|
||||
}
|
||||
if first.Endpoint.Addr.String() != epAddr.String() {
|
||||
t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, first.Endpoint.Addr.Port)
|
||||
if profile.Endpoint.Addr.String() != epAddr.String() {
|
||||
t.Fatalf("Expected endpoint IP port to be %d, but it was %d", epAddr.Port, profile.Endpoint.Addr.Port)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return opaque protocol profile when using service name with opaque port annotation", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
stream := profileStream(t, fullyQualifiedNameOpaqueService, opaquePort, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.FullyQualifiedName != fullyQualifiedNameOpaqueService {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaqueService, profile.FullyQualifiedName)
|
||||
}
|
||||
stream.Cancel()
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", fullyQualifiedNameOpaqueService, opaquePort),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
last := stream.updates[len(stream.updates)-1]
|
||||
if last.FullyQualifiedName != fullyQualifiedNameOpaqueService {
|
||||
t.Fatalf("Expected fully qualified name '%s', but got '%s'", fullyQualifiedNameOpaqueService, last.FullyQualifiedName)
|
||||
}
|
||||
if !last.OpaqueProtocol {
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", opaquePort)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with unknown protocol hint and identity when pod contains skipped inbound port", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
|
||||
stream.Cancel()
|
||||
|
||||
path := fmt.Sprintf("%s:%d", podIPSkipped, skippedPort)
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: path,
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
if len(stream.updates) == 0 || len(stream.updates) > 3 {
|
||||
t.Fatalf("Expected 1 to 3 updates but got %d: %v", len(stream.updates), stream.updates)
|
||||
}
|
||||
|
||||
last := stream.updates[len(stream.updates)-1]
|
||||
|
||||
addr := last.GetEndpoint()
|
||||
stream := profileStream(t, podIPSkipped, skippedPort, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
addr := profile.GetEndpoint()
|
||||
if addr == nil {
|
||||
t.Fatalf("Expected to not be nil")
|
||||
}
|
||||
|
||||
if addr.GetProtocolHint().GetProtocol() != nil || addr.GetProtocolHint().GetOpaqueTransport() != nil {
|
||||
t.Fatalf("Expected protocol hint for %s to be nil but got %+v", path, addr.ProtocolHint)
|
||||
t.Fatalf("Expected protocol hint for %s to be nil but got %+v", podIPSkipped, addr.ProtocolHint)
|
||||
}
|
||||
|
||||
if addr.TlsIdentity != nil {
|
||||
t.Fatalf("Expected TLS identity for %s to be nil but got %+v", path, addr.TlsIdentity)
|
||||
t.Fatalf("Expected TLS identity for %s to be nil but got %+v", podIPSkipped, addr.TlsIdentity)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using Pod IP selected by a Server", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
|
||||
_, err := toAddress(podIPPolicy, 80)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
err = server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", podIPPolicy, 80),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// Test that the first update has a destination profile with an
|
||||
// opaque protocol and opaque transport.
|
||||
if len(stream.updates) == 0 {
|
||||
t.Fatalf("Expected at least 1 update but got 0")
|
||||
}
|
||||
update := stream.updates[0]
|
||||
if update.Endpoint == nil {
|
||||
stream := profileStream(t, podIPPolicy, 80, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.Endpoint == nil {
|
||||
t.Fatalf("Expected response to have endpoint field")
|
||||
}
|
||||
if !update.OpaqueProtocol {
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 80)
|
||||
}
|
||||
if update.Endpoint.ProtocolHint == nil {
|
||||
if profile.Endpoint.ProtocolHint == nil {
|
||||
t.Fatalf("Expected protocol hint but found none")
|
||||
}
|
||||
if update.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 {
|
||||
if profile.Endpoint.ProtocolHint.GetOpaqueTransport().GetInboundPort() != 4143 {
|
||||
t.Fatalf("Expected pod to support opaque traffic on port 4143")
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with opaque protocol when using an opaque port with an external IP", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
|
||||
_, err := toAddress(externalIP, 3306)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
err = server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", externalIP, 3306),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// Test that the first update has a destination profile with an
|
||||
// opaque protocol and opaque transport.
|
||||
if len(stream.updates) == 0 {
|
||||
t.Fatalf("Expected at least 1 update but got 0")
|
||||
}
|
||||
update := stream.updates[0]
|
||||
if !update.OpaqueProtocol {
|
||||
stream := profileStream(t, externalIP, 3306, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if !profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be an opaque protocol, but it was not", 3306)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Return profile with non-opaque protocol when using an arbitrary port with an external IP", func(t *testing.T) {
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
stream.Cancel()
|
||||
|
||||
_, err := toAddress(externalIP, 80)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
err = server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", externalIP, 80),
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
// Test that the first update has a destination profile with an
|
||||
// opaque protocol and opaque transport.
|
||||
if len(stream.updates) == 0 {
|
||||
t.Fatalf("Expected at least 1 update but got 0")
|
||||
}
|
||||
update := stream.updates[0]
|
||||
if update.OpaqueProtocol {
|
||||
stream := profileStream(t, externalIP, 80, "")
|
||||
profile := assertSingleProfile(t, stream.updates)
|
||||
if profile.OpaqueProtocol {
|
||||
t.Fatalf("Expected port %d to be a non-opaque protocol, but it was opaque", 80)
|
||||
}
|
||||
})
|
||||
|
|
@ -970,3 +607,55 @@ status:
|
|||
}
|
||||
})
|
||||
}
|
||||
|
||||
func assertSingleProfile(t *testing.T, updates []*pb.DestinationProfile) *pb.DestinationProfile {
|
||||
t.Helper()
|
||||
// Under normal conditions the creation of resources by the fake API will
|
||||
// generate notifications that are discarded after the stream.Cancel() call,
|
||||
// but very rarely those notifications might come after, in which case we'll
|
||||
// get a second update.
|
||||
if len(updates) != 1 {
|
||||
t.Fatalf("Expected 1 profile update but got %d: %v", len(updates), updates)
|
||||
}
|
||||
return updates[0]
|
||||
}
|
||||
|
||||
func assertSingleUpdate(t *testing.T, updates []*pb.Update) *pb.Update {
|
||||
t.Helper()
|
||||
// Under normal conditions the creation of resources by the fake API will
|
||||
// generate notifications that are discarded after the stream.Cancel() call,
|
||||
// but very rarely those notifications might come after, in which case we'll
|
||||
// get a second update.
|
||||
if len(updates) == 0 || len(updates) > 2 {
|
||||
t.Fatalf("Expected 1 or 2 updates but got %d: %v", len(updates), updates)
|
||||
}
|
||||
return updates[0]
|
||||
}
|
||||
|
||||
func profileStream(t *testing.T, host string, port uint32, token string) *bufferingGetProfileStream {
|
||||
t.Helper()
|
||||
|
||||
server := makeServer(t)
|
||||
stream := &bufferingGetProfileStream{
|
||||
updates: []*pb.DestinationProfile{},
|
||||
MockServerStream: util.NewMockServerStream(),
|
||||
}
|
||||
|
||||
// We cancel the stream before even sending the request so that we don't
|
||||
// need to call server.Get in a separate goroutine. By preemptively
|
||||
// cancelling, the behavior of Get becomes effectively synchronous and
|
||||
// we will get only the initial update, which is what we want for this
|
||||
// test.
|
||||
stream.Cancel()
|
||||
|
||||
err := server.GetProfile(&pb.GetDestination{
|
||||
Scheme: "k8s",
|
||||
Path: fmt.Sprintf("%s:%d", host, port),
|
||||
ContextToken: token,
|
||||
}, stream)
|
||||
if err != nil {
|
||||
t.Fatalf("Got error: %s", err)
|
||||
}
|
||||
|
||||
return stream
|
||||
}
|
||||
|
|
|
|||
|
|
@ -369,6 +369,7 @@ spec:
|
|||
t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
|
||||
}
|
||||
log := logging.WithField("test", t.Name())
|
||||
logging.SetLevel(logging.DebugLevel)
|
||||
defaultOpaquePorts := map[uint32]struct{}{
|
||||
25: {},
|
||||
443: {},
|
||||
|
|
|
|||
|
|
@ -50,6 +50,7 @@ func (bpl *BufferingProfileListener) Update(profile *sp.ServiceProfile) {
|
|||
}
|
||||
|
||||
func testCompare(t *testing.T, expected interface{}, actual interface{}) {
|
||||
t.Helper()
|
||||
if diff := deep.Equal(expected, actual); diff != nil {
|
||||
t.Fatalf("%v", diff)
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue