Fix invalid `l5d-require-id` for some tap requests (#3210)

PR #3154 introduced an `l5d-require-id` header to Tap requests. That
header string was constructed based on the TapByResourceRequest, which
includes 3 notable fields (type, name, namespace). For namespace-level
requests (via commands like `linkerd tap ns linkerd`), type ==
`namespace`, name == `linkerd`, and namespace == "". This special casing
for namespace-level requests yielded invalid `l5d-require-id` headers,
for example: `pd-sa..serviceaccount.identity.linkerd.cluster.local`.

Fix `l5d-require-id` string generation to account for namespace-level
requests. The bulk of this change is tap unit test updates to validate
the fix.

Signed-off-by: Andrew Seigner <siggy@buoyant.io>
This commit is contained in:
Andrew Seigner 2019-08-08 09:42:11 -07:00 committed by GitHub
parent 54b2103bba
commit f98bc27a38
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 349 additions and 185 deletions

View File

@ -5,13 +5,14 @@ import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination/watcher"
"github.com/linkerd/linkerd2/controller/api/util"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
logging "github.com/sirupsen/logrus"
)
type mockDestinationGetServer struct {
mockServerStream
util.MockServerStream
updatesReceived []*pb.Update
}
@ -21,7 +22,7 @@ func (m *mockDestinationGetServer) Send(update *pb.Update) error {
}
type mockDestinationGetProfileServer struct {
mockServerStream
util.MockServerStream
profilesReceived []*pb.DestinationProfile
}
@ -119,7 +120,7 @@ spec:
type bufferingGetStream struct {
updates []*pb.Update
mockServerStream
util.MockServerStream
}
func (bgs *bufferingGetStream) Send(update *pb.Update) error {
@ -129,7 +130,7 @@ func (bgs *bufferingGetStream) Send(update *pb.Update) error {
type bufferingGetProfileStream struct {
updates []*pb.DestinationProfile
mockServerStream
util.MockServerStream
}
func (bgps *bufferingGetProfileStream) Send(profile *pb.DestinationProfile) error {
@ -143,7 +144,7 @@ func TestGet(t *testing.T) {
stream := &bufferingGetStream{
updates: []*pb.Update{},
mockServerStream: newMockServerStream(),
MockServerStream: util.NewMockServerStream(),
}
err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
@ -157,7 +158,7 @@ func TestGet(t *testing.T) {
stream := &bufferingGetStream{
updates: []*pb.Update{},
mockServerStream: newMockServerStream(),
MockServerStream: util.NewMockServerStream(),
}
// We cancel the stream before even sending the request so that we don't
@ -165,7 +166,7 @@ func TestGet(t *testing.T) {
// cancelling, the behavior of Get becomes effectively synchronous and
// we will get only the initial update, which is what we want for this
// test.
stream.cancel()
stream.Cancel()
err := server.Get(&pb.GetDestination{Scheme: "k8s", Path: "name1.ns.svc.mycluster.local:8989"}, stream)
if err != nil {
@ -189,7 +190,7 @@ func TestGetProfiles(t *testing.T) {
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
mockServerStream: newMockServerStream(),
MockServerStream: util.NewMockServerStream(),
}
err := server.GetProfile(&pb.GetDestination{Scheme: "k8s", Path: "linkerd.io"}, stream)
@ -203,10 +204,10 @@ func TestGetProfiles(t *testing.T) {
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
mockServerStream: newMockServerStream(),
MockServerStream: util.NewMockServerStream(),
}
stream.cancel() // See note above on pre-emptive cancellation.
stream.Cancel() // See note above on pre-emptive cancellation.
err := server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: "name1.ns.svc.mycluster.local:8989",
@ -241,11 +242,11 @@ func TestGetProfiles(t *testing.T) {
stream := &bufferingGetProfileStream{
updates: []*pb.DestinationProfile{},
mockServerStream: newMockServerStream(),
MockServerStream: util.NewMockServerStream(),
}
// See note above on pre-emptive cancellation.
stream.cancel()
stream.Cancel()
err := server.GetProfile(&pb.GetDestination{
Scheme: "k8s",
Path: "name1.ns.svc.mycluster.local:8989",

View File

@ -1,30 +0,0 @@
package destination
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
type mockStream struct {
ctx context.Context
cancel context.CancelFunc
}
func newMockStream() mockStream {
ctx, cancel := context.WithCancel(context.Background())
return mockStream{ctx, cancel}
}
func (ms mockStream) Context() context.Context { return ms.ctx }
func (ms mockStream) SendMsg(m interface{}) error { return nil }
func (ms mockStream) RecvMsg(m interface{}) error { return nil }
type mockServerStream struct{ mockStream }
func (mss mockServerStream) SetHeader(metadata.MD) error { return nil }
func (mss mockServerStream) SendHeader(metadata.MD) error { return nil }
func (mss mockServerStream) SetTrailer(metadata.MD) {}
func newMockServerStream() mockServerStream {
return mockServerStream{newMockStream()}
}

View File

@ -0,0 +1,37 @@
package util
import (
"golang.org/x/net/context"
"google.golang.org/grpc/metadata"
)
type mockStream struct {
ctx context.Context
Cancel context.CancelFunc
}
func newMockStream() mockStream {
ctx, cancel := context.WithCancel(context.Background())
return mockStream{ctx, cancel}
}
func (ms mockStream) Context() context.Context { return ms.ctx }
func (ms mockStream) SendMsg(m interface{}) error { return nil }
func (ms mockStream) RecvMsg(m interface{}) error { return nil }
// MockServerStream satisfies the grpc.ServerStream interface
type MockServerStream struct{ mockStream }
// SetHeader satisfies the grpc.ServerStream interface
func (mss MockServerStream) SetHeader(metadata.MD) error { return nil }
// SendHeader satisfies the grpc.ServerStream interface
func (mss MockServerStream) SendHeader(metadata.MD) error { return nil }
// SetTrailer satisfies the grpc.ServerStream interface
func (mss MockServerStream) SetTrailer(metadata.MD) {}
// NewMockServerStream instantiates a MockServerStream
func NewMockServerStream() MockServerStream {
return MockServerStream{newMockStream()}
}

View File

@ -50,14 +50,18 @@ func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_T
if req == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest")
}
if req.Target == nil {
if req.GetTarget() == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil target ResourceSelection")
}
if req.MaxRps == 0.0 {
res := req.GetTarget().GetResource()
if res == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil target Resource")
}
if req.GetMaxRps() == 0.0 {
req.MaxRps = defaultMaxRps
}
objects, err := s.k8sAPI.GetObjects(req.Target.Resource.Namespace, req.Target.Resource.Type, req.Target.Resource.Name)
objects, err := s.k8sAPI.GetObjects(res.GetNamespace(), res.GetType(), res.GetName())
if err != nil {
return apiUtil.GRPCError(err)
}
@ -82,8 +86,8 @@ func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_T
}
if len(pods) == 0 {
resType := req.GetTarget().GetResource().GetType()
resName := req.GetTarget().GetResource().GetName()
resType := res.GetType()
resName := res.GetName()
if foundDisabledPods {
return status.Errorf(codes.NotFound,
"all pods found for %s/%s have tapping disabled", resType, resName)
@ -91,24 +95,27 @@ func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_T
return status.Errorf(codes.NotFound, "no pods found for %s/%s", resType, resName)
}
log.Infof("Tapping %d pods for target: %+v", len(pods), *req.Target.Resource)
log.Infof("Tapping %d pods for target: %+v", len(pods), *res)
events := make(chan *public.TapEvent)
// divide the rps evenly between all pods to tap
rpsPerPod := req.MaxRps / float32(len(pods))
rpsPerPod := req.GetMaxRps() / float32(len(pods))
if rpsPerPod < 1 {
rpsPerPod = 1
}
match, err := makeByResourceMatch(req.Match)
match, err := makeByResourceMatch(req.GetMatch())
if err != nil {
return apiUtil.GRPCError(err)
}
for _, pod := range pods {
// create the expected pod identity from the pod spec
ns := req.GetTarget().GetResource().GetNamespace()
ns := res.GetNamespace()
if res.GetType() == pkgK8s.Namespace {
ns = res.GetName()
}
name := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.cluster.local", pod.Spec.ServiceAccountName, ns, s.controllerNamespace)
log.Debugf("initiating tap request to %s with required name %s", pod.Spec.ServiceAccountName, name)
@ -461,15 +468,26 @@ func NewServer(
return nil, nil, err
}
s := prometheus.NewGrpcServer()
srv := server{
s, _ := newGRPCTapServer(tapPort, controllerNamespace, k8sAPI)
return s, lis, nil
}
func newGRPCTapServer(
tapPort uint,
controllerNamespace string,
k8sAPI *k8s.API,
) (*grpc.Server, *server) {
srv := &server{
tapPort: tapPort,
k8sAPI: k8sAPI,
controllerNamespace: controllerNamespace,
}
pb.RegisterTapServer(s, &srv)
return s, lis, nil
s := prometheus.NewGrpcServer()
pb.RegisterTapServer(s, srv)
return s, srv
}
func indexPodByIP(obj interface{}) ([]string, error) {

View File

@ -2,32 +2,61 @@ package tap
import (
"context"
"fmt"
"net"
"reflect"
"strconv"
"testing"
"time"
proxy "github.com/linkerd/linkerd2-proxy-api/go/tap"
"github.com/linkerd/linkerd2/controller/api/util"
"github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/controller/k8s"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
type tapExpected struct {
msg string
k8sRes []string
req public.TapByResourceRequest
eofOk bool
err error
k8sRes []string
req public.TapByResourceRequest
requireID string
}
// mockTapByResourceServer satisfies controller.tap.Tap_TapByResourceServer
type mockTapByResourceServer struct {
util.MockServerStream
}
func (m *mockTapByResourceServer) Send(event *public.TapEvent) error {
return nil
}
// mockProxyTapServer satisfies proxy.tap.TapServer
type mockProxyTapServer struct {
mockControllerServer mockTapByResourceServer // for cancellation
ctx context.Context
}
func (m *mockProxyTapServer) Observe(req *proxy.ObserveRequest, obsSrv proxy.Tap_ObserveServer) error {
m.ctx = obsSrv.Context()
m.mockControllerServer.Cancel()
return nil
}
func TestTapByResource(t *testing.T) {
t.Run("Returns expected response", func(t *testing.T) {
expectations := []tapExpected{
{
msg: "rpc error: code = InvalidArgument desc = TapByResource received nil target ResourceSelection",
k8sRes: []string{},
req: public.TapByResourceRequest{},
},
{
msg: "rpc error: code = Unimplemented desc = unexpected match specified: any:<> ",
k8sRes: []string{`
expectations := []tapExpected{
{
err: status.Error(codes.InvalidArgument, "TapByResource received nil target ResourceSelection"),
k8sRes: []string{},
req: public.TapByResourceRequest{},
},
{
err: status.Errorf(codes.Unimplemented, "unexpected match specified: any:<> "),
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -40,26 +69,27 @@ metadata:
linkerd.io/proxy-version: testinjectversion
status:
phase: Running
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_Any{
Any: &public.TapByResourceRequest_Match_Seq{},
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_Any{
Any: &public.TapByResourceRequest_Match_Seq{},
},
},
},
{
msg: "rpc error: code = NotFound desc = no pods found for pod/emojivoto-not-meshed",
k8sRes: []string{`
},
{
err: status.Errorf(codes.NotFound, "no pods found for pod/emojivoto-not-meshed"),
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -69,34 +99,35 @@ metadata:
app: emoji-svc
status:
phase: Running
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-not-meshed",
},
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-not-meshed",
},
},
},
{
msg: "rpc error: code = Unimplemented desc = unimplemented resource type: bad-type",
k8sRes: []string{},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "bad-type",
Name: "emojivoto-meshed-not-found",
},
},
{
err: status.Errorf(codes.Unimplemented, "unimplemented resource type: bad-type"),
k8sRes: []string{},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "bad-type",
Name: "emojivoto-meshed-not-found",
},
},
},
{
msg: "rpc error: code = NotFound desc = pod \"emojivoto-meshed-not-found\" not found",
k8sRes: []string{`
},
{
err: status.Errorf(codes.NotFound, "pod \"emojivoto-meshed-not-found\" not found"),
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -108,21 +139,22 @@ metadata:
linkerd.io/proxy-version: testinjectversion
status:
phase: Running
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-not-found",
},
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-not-found",
},
},
},
{
msg: "rpc error: code = NotFound desc = no pods found for pod/emojivoto-meshed",
k8sRes: []string{`
},
{
err: status.Errorf(codes.NotFound, "no pods found for pod/emojivoto-meshed"),
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -134,21 +166,22 @@ metadata:
linkerd.io/proxy-version: testinjectversion
status:
phase: Finished
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
},
{
msg: "rpc error: code = NotFound desc = all pods found for pod/emojivoto-meshed-tap-disabled have tapping disabled",
k8sRes: []string{`
},
{
err: status.Errorf(codes.NotFound, "all pods found for pod/emojivoto-meshed-tap-disabled have tapping disabled"),
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -162,29 +195,28 @@ metadata:
linkerd.io/proxy-version: testinjectversion
status:
phase: Running
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-tap-disabled",
},
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-tap-disabled",
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
},
{
// indicates we will accept EOF, in addition to the deadline exceeded message
eofOk: true,
// success, underlying tap events tested in http_server_test.go
msg: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
k8sRes: []string{`
},
{
// success, underlying tap events tested in http_server_test.go
err: nil,
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
@ -197,61 +229,167 @@ metadata:
linkerd.io/proxy-version: testinjectversion
status:
phase: Running
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
},
}
requireID: ".emojivoto.serviceaccount.identity.controller-ns.cluster.local",
},
{
err: nil,
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-meshed
namespace: emojivoto
labels:
app: emoji-svc
linkerd.io/control-plane-ns: controller-ns
annotations:
linkerd.io/proxy-version: testinjectversion
spec:
serviceAccountName: emojivoto-meshed-sa
status:
phase: Running
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
},
requireID: "emojivoto-meshed-sa.emojivoto.serviceaccount.identity.controller-ns.cluster.local",
},
{
err: nil,
k8sRes: []string{`
apiVersion: v1
kind: Namespace
metadata:
name: emojivoto
`, `
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-meshed
namespace: emojivoto
labels:
app: emoji-svc
linkerd.io/control-plane-ns: controller-ns
annotations:
linkerd.io/proxy-version: testinjectversion
spec:
serviceAccountName: emojivoto-meshed-sa
status:
phase: Running
podIP: 127.0.0.1
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "",
Type: pkgK8s.Namespace,
Name: "emojivoto",
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
},
requireID: "emojivoto-meshed-sa.emojivoto.serviceaccount.identity.controller-ns.cluster.local",
},
}
for i, exp := range expectations {
exp := exp // pin
t.Run(fmt.Sprintf("%d: Returns expected response", i), func(t *testing.T) {
for _, exp := range expectations {
k8sAPI, err := k8s.NewFakeAPI(exp.k8sRes...)
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
server, listener, err := NewServer("localhost:0", 0, "controller-ns", k8sAPI)
if err != nil {
t.Fatalf("NewServer error: %s", err)
stream := mockTapByResourceServer{
MockServerStream: util.NewMockServerStream(),
}
go func() { server.Serve(listener) }()
defer server.GracefulStop()
s := grpc.NewServer()
mockProxyTapServer := mockProxyTapServer{
mockControllerServer: stream,
}
proxy.RegisterTapServer(s, &mockProxyTapServer)
lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Failed to listen")
}
// TODO: mock out the underlying grpc tap events
go func() {
err := s.Serve(lis)
if err != nil {
t.Fatalf("Failed to serve on %+v: %s", lis, err)
}
}()
defer s.GracefulStop()
_, port, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
t.Fatal(err.Error())
}
tapPort, err := strconv.ParseUint(port, 10, 32)
if err != nil {
t.Fatalf("Invalid port: %s", port)
}
_, fakeGrpcServer := newGRPCTapServer(uint(tapPort), "controller-ns", k8sAPI)
k8sAPI.Sync()
client, conn, err := NewClient(listener.Addr().String())
if err != nil {
t.Fatalf("NewClient error: %v", err)
}
defer conn.Close()
// TODO: mock out the underlying grpc tap events, rather than waiting an
// arbitrary time for request to timeout.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
tapByResourceClient, err := client.TapByResource(ctx, &exp.req)
if err != nil {
t.Fatalf("TapByResource failed: %v", err)
err = fakeGrpcServer.TapByResource(&exp.req, &stream)
if !reflect.DeepEqual(err, exp.err) {
t.Fatalf("TapByResource returned unexpected: [%s], expected: [%s]", err, exp.err)
}
_, err = tapByResourceClient.Recv()
if err.Error() != exp.msg && (!exp.eofOk || err.Error() != "EOF") {
t.Fatalf("Expected error to be [%s], but was [%s]. eofOk: %v", exp.msg, err, exp.eofOk)
if exp.requireID != "" {
md, ok := metadata.FromIncomingContext(mockProxyTapServer.ctx)
if !ok {
t.Fatalf("FromIncomingContext failed given: %+v", mockProxyTapServer.ctx)
}
if !reflect.DeepEqual(md.Get(requireIDHeader), []string{exp.requireID}) {
t.Fatalf("Unexpected l5d-require-id header [%+v] expected [%+v]", md.Get(requireIDHeader), []string{exp.requireID})
}
}
}
})
})
}
}