Add JSON output to tap command (#3434)

Replaces #3411 

### Motivation

It is a little tough to filter/read the current tap output. As headers are being
added to tap, the output is starting to get difficult to consume. Take a peek at
#3262 for an example. It would be nice to have some more machine readable output
that can be sliced and diced with tools such as jq.

### Solution

A new output option has been added to the `linkerd tap` command that returns the
JSON encoding of tap events.

The default output is line oriented; `-o wide` appends the request's target
resource type to the tap line oriented tap events.

In order display certain values in a more human readable form, a tap event
display struct has been introduced. This struct maps public API `TapEvent`s
directly to a private `tapEvent`. This struct offers a flatter JSON structure
than the protobuf JSON rendering. It also can format certain field--such as
addresses--better than the JSON protobuf marshaler.

Closes #3390

**Default**:
```
➜  linkerd2 git:(kleimkuhler/tap-json-output) linkerd -n linkerd tap deploy/linkerd-web
req id=5:0 proxy=in  src=10.1.6.146:36976 dst=10.1.6.148:9994 tls=not_provided_by_remote :method=GET :authority=10.1.6.148:9994 :path=/metrics
rsp id=5:0 proxy=in  src=10.1.6.146:36976 dst=10.1.6.148:9994 tls=not_provided_by_remote :status=200 latency=3366µs
end id=5:0 proxy=in  src=10.1.6.146:36976 dst=10.1.6.148:9994 tls=not_provided_by_remote duration=132µs response-length=1505B
```

**Wide**:
```
➜  linkerd2 git:(kleimkuhler/tap-json-output) linkerd -n linkerd tap deploy/linkerd-web -o wide
req id=6:0 proxy=in  src=10.1.0.1:35394 dst=10.1.6.148:9994 tls=not_provided_by_remote :method=GET :authority=10.1.6.148:9994 :path=/ping dst_res=deploy/linkerd-web dst_ns=linkerd
rsp id=6:0 proxy=in  src=10.1.0.1:35394 dst=10.1.6.148:9994 tls=not_provided_by_remote :status=200 latency=1442µs dst_res=deploy/linkerd-web dst_ns=linkerd
end id=6:0 proxy=in  src=10.1.0.1:35394 dst=10.1.6.148:9994 tls=not_provided_by_remote duration=88µs response-length=5B dst_res=deploy/linkerd-web dst_ns=linkerd
```

**JSON**:
*Edit: Flattened `Method` and `Scheme` formatting*
```
{
  "source": {
    "ip": "10.138.0.28",
    "port": 47078,
    "metadata": {
      "daemonset": "ip-masq-agent",
      "namespace": "kube-system",
      "pod": "ip-masq-agent-4d5s9",
      "serviceaccount": "ip-masq-agent",
      "tls": "not_provided_by_remote"
    }
  },
  "destination": {
    "ip": "10.60.1.49",
    "port": 9994,
    "metadata": {
      "control_plane_ns": "linkerd",
      "deployment": "linkerd-web",
      "namespace": "linkerd",
      "pod": "linkerd-web-6988999458-c6wpw",
      "pod_template_hash": "6988999458",
      "serviceaccount": "linkerd-web"
    }
  },
  "routeMeta": null,
  "proxyDirection": "INBOUND",
  "requestInitEvent": {
    "id": {
      "base": 0,
      "stream": 0
    },
    "method": "GET",
    "scheme": "",
    "authority": "10.60.1.49:9994",
    "path": "/ready"
  }
}
{
  "source": {
    "ip": "10.138.0.28",
    "port": 47078,
    "metadata": {
      "daemonset": "calico-node",
      "namespace": "kube-system",
      "pod": "calico-node-bbrjq",
      "serviceaccount": "calico-sa",
      "tls": "not_provided_by_remote"
    }
  },
  "destination": {
    "ip": "10.60.1.49",
    "port": 9994,
    "metadata": {
      "control_plane_ns": "linkerd",
      "deployment": "linkerd-web",
      "namespace": "linkerd",
      "pod": "linkerd-web-6988999458-c6wpw",
      "pod_template_hash": "6988999458",
      "serviceaccount": "linkerd-web"
    }
  },
  "routeMeta": null,
  "proxyDirection": "INBOUND",
  "responseInitEvent": {
    "id": {
      "base": 0,
      "stream": 0
    },
    "sinceRequestInit": {
      "nanos": 644820
    },
    "httpStatus": 200
  }
}
{
  "source": {
    "ip": "10.138.0.28",
    "port": 47078,
    "metadata": {
      "deployment": "calico-typha",
      "namespace": "kube-system",
      "pod": "calico-typha-59cb487c49-8247r",
      "pod_template_hash": "59cb487c49",
      "serviceaccount": "calico-sa",
      "tls": "not_provided_by_remote"
    }
  },
  "destination": {
    "ip": "10.60.1.49",
    "port": 9994,
    "metadata": {
      "control_plane_ns": "linkerd",
      "deployment": "linkerd-web",
      "namespace": "linkerd",
      "pod": "linkerd-web-6988999458-c6wpw",
      "pod_template_hash": "6988999458",
      "serviceaccount": "linkerd-web"
    }
  },
  "routeMeta": null,
  "proxyDirection": "INBOUND",
  "responseEndEvent": {
    "id": {
      "base": 0,
      "stream": 0
    },
    "sinceRequestInit": {
      "nanos": 790898
    },
    "sinceResponseInit": {
      "nanos": 146078
    },
    "responseBytes": 3,
    "grpcStatusCode": 0
  }
}
```

Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
This commit is contained in:
Kevin Leimkuhler 2019-09-19 09:34:49 -07:00 committed by GitHub
parent 30ecddb965
commit c62c90870e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 316 additions and 48 deletions

View File

@ -2,12 +2,13 @@ package cmd
import (
"bufio"
"encoding/json"
"fmt"
"io"
"os"
"strings"
"text/tabwriter"
"github.com/golang/protobuf/ptypes/duration"
"github.com/linkerd/linkerd2/controller/api/util"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/addr"
@ -19,6 +20,8 @@ import (
"google.golang.org/grpc/codes"
)
type renderTapEventFunc func(*pb.TapEvent, string) string
type tapOptions struct {
namespace string
toResource string
@ -31,6 +34,51 @@ type tapOptions struct {
output string
}
type endpoint struct {
IP string `json:"ip"`
Port uint32 `json:"port"`
Metadata map[string]string `json:"metadata"`
}
type streamID struct {
Base uint32 `json:"base"`
Stream uint64 `json:"stream"`
}
type requestInitEvent struct {
ID *streamID `json:"id"`
Method string `json:"method"`
Scheme string `json:"scheme"`
Authority string `json:"authority"`
Path string `json:"path"`
}
type responseInitEvent struct {
ID *streamID `json:"id"`
SinceRequestInit *duration.Duration `json:"sinceRequestInit"`
HTTPStatus uint32 `json:"httpStatus"`
}
type responseEndEvent struct {
ID *streamID `json:"id"`
SinceRequestInit *duration.Duration `json:"sinceRequestInit"`
SinceResponseInit *duration.Duration `json:"sinceResponseInit"`
ResponseBytes uint64 `json:"responseBytes"`
GrpcStatusCode uint32 `json:"grpcStatusCode"`
ResetErrorCode uint32 `json:"resetErrorCode,omitempty"`
}
// Private type used for displaying JSON encoded tap events
type tapEvent struct {
Source *endpoint `json:"source"`
Destination *endpoint `json:"destination"`
RouteMeta map[string]string `json:"routeMeta"`
ProxyDirection string `json:"proxyDirection"`
RequestInitEvent *requestInitEvent `json:"requestInitEvent,omitempty"`
ResponseInitEvent *responseInitEvent `json:"responseInitEvent,omitempty"`
ResponseEndEvent *responseEndEvent `json:"responseEndEvent,omitempty"`
}
func newTapOptions() *tapOptions {
return &tapOptions{
namespace: "default",
@ -45,6 +93,14 @@ func newTapOptions() *tapOptions {
}
}
func (o *tapOptions) validate() error {
if o.output == "" || o.output == wideOutput || o.output == jsonOutput {
return nil
}
return fmt.Errorf("output format \"%s\" not recognized", o.output)
}
func newCmdTap() *cobra.Command {
options := newTapOptions()
@ -98,28 +154,22 @@ func newCmdTap() *cobra.Command {
Path: options.path,
}
err := options.validate()
if err != nil {
return fmt.Errorf("validation error when executing tap command: %v", err)
}
req, err := util.BuildTapByResourceRequest(requestParams)
if err != nil {
return err
}
wide := false
switch options.output {
// TODO: support more output formats?
case "":
// default output format.
case wideOutput:
wide = true
default:
return fmt.Errorf("output format \"%s\" not recognized", options.output)
}
k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, 0)
if err != nil {
return err
}
return requestTapByResourceFromAPI(os.Stdout, k8sAPI, req, wide)
return requestTapByResourceFromAPI(os.Stdout, k8sAPI, req, options)
},
}
@ -140,38 +190,40 @@ func newCmdTap() *cobra.Command {
cmd.PersistentFlags().StringVar(&options.path, "path", options.path,
"Display requests with paths that start with this prefix")
cmd.PersistentFlags().StringVarP(&options.output, "output", "o", options.output,
"Output format. One of: wide")
fmt.Sprintf("Output format. One of: \"%s\", \"%s\"", wideOutput, jsonOutput))
return cmd
}
func requestTapByResourceFromAPI(w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, wide bool) error {
var resource string
if wide {
resource = req.GetTarget().GetResource().GetType()
}
func requestTapByResourceFromAPI(w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, options *tapOptions) error {
reader, body, err := tap.Reader(k8sAPI, req, 0)
if err != nil {
return err
}
defer body.Close()
return renderTap(w, reader, resource)
return writeTapEventsToBuffer(w, reader, req, options)
}
func renderTap(w io.Writer, tapByteStream *bufio.Reader, resource string) error {
tableWriter := tabwriter.NewWriter(w, 0, 0, 0, ' ', tabwriter.AlignRight)
err := writeTapEventsToBuffer(tapByteStream, tableWriter, resource)
func writeTapEventsToBuffer(w io.Writer, tapByteStream *bufio.Reader, req *pb.TapByResourceRequest, options *tapOptions) error {
var err error
switch options.output {
case "":
err = renderTapEvents(tapByteStream, w, renderTapEvent, "")
case wideOutput:
resource := req.GetTarget().GetResource().GetType()
err = renderTapEvents(tapByteStream, w, renderTapEvent, resource)
case jsonOutput:
err = renderTapEvents(tapByteStream, w, renderTapEventJSON, "")
}
if err != nil {
return err
}
tableWriter.Flush()
return nil
}
func writeTapEventsToBuffer(tapByteStream *bufio.Reader, w *tabwriter.Writer, resource string) error {
func renderTapEvents(tapByteStream *bufio.Reader, w io.Writer, render renderTapEventFunc, resource string) error {
for {
log.Debug("Waiting for data...")
event := pb.TapEvent{}
@ -183,7 +235,7 @@ func writeTapEventsToBuffer(tapByteStream *bufio.Reader, w *tabwriter.Writer, re
fmt.Fprintln(os.Stderr, err)
break
}
_, err = fmt.Fprintln(w, renderTapEvent(&event, resource))
_, err = fmt.Fprintln(w, render(&event, resource))
if err != nil {
return err
}
@ -292,6 +344,121 @@ func renderTapEvent(event *pb.TapEvent, resource string) string {
}
}
// renderTapEventJSON renders a Public API TapEvent to a string in JSON format.
func renderTapEventJSON(event *pb.TapEvent, _ string) string {
m := mapPublicToDisplayTapEvent(event)
e, err := json.MarshalIndent(m, "", " ")
if err != nil {
return fmt.Sprintf("{\"error marshalling JSON\": \"%s\"}", err)
}
return fmt.Sprintf("%s", e)
}
// Map public API `TapEvent`s to `displayTapEvent`s
func mapPublicToDisplayTapEvent(event *pb.TapEvent) *tapEvent {
// Map source endpoint
sip := addr.PublicIPToString(event.GetSource().GetIp())
src := &endpoint{
IP: sip,
Port: event.GetSource().GetPort(),
Metadata: event.GetSourceMeta().GetLabels(),
}
// Map destination endpoint
dip := addr.PublicIPToString(event.GetDestination().GetIp())
dst := &endpoint{
IP: dip,
Port: event.GetDestination().GetPort(),
Metadata: event.GetDestinationMeta().GetLabels(),
}
return &tapEvent{
Source: src,
Destination: dst,
RouteMeta: event.GetRouteMeta().GetLabels(),
ProxyDirection: event.GetProxyDirection().String(),
RequestInitEvent: getRequestInitEvent(event.GetHttp()),
ResponseInitEvent: getResponseInitEvent(event.GetHttp()),
ResponseEndEvent: getResponseEndEvent(event.GetHttp()),
}
}
// Attempt to map a `TapEvent_Http_RequestInit event to a `requestInitEvent`
func getRequestInitEvent(pubEv *pb.TapEvent_Http) *requestInitEvent {
reqI := pubEv.GetRequestInit()
if reqI == nil {
return nil
}
sid := &streamID{
Base: reqI.GetId().GetBase(),
Stream: reqI.GetId().GetStream(),
}
return &requestInitEvent{
ID: sid,
Method: formatMethod(reqI.GetMethod()),
Scheme: formatScheme(reqI.GetScheme()),
Authority: reqI.GetAuthority(),
Path: reqI.GetPath(),
}
}
func formatMethod(m *pb.HttpMethod) string {
if x, ok := m.GetType().(*pb.HttpMethod_Registered_); ok {
return x.Registered.String()
}
if s, ok := m.GetType().(*pb.HttpMethod_Unregistered); ok {
return s.Unregistered
}
return ""
}
func formatScheme(s *pb.Scheme) string {
if x, ok := s.GetType().(*pb.Scheme_Registered_); ok {
return x.Registered.String()
}
if str, ok := s.GetType().(*pb.Scheme_Unregistered); ok {
return str.Unregistered
}
return ""
}
// Attempt to map a `TapEvent_Http_ResponseInit` event to a `responseInitEvent`
func getResponseInitEvent(pubEv *pb.TapEvent_Http) *responseInitEvent {
resI := pubEv.GetResponseInit()
if resI == nil {
return nil
}
sid := &streamID{
Base: resI.GetId().GetBase(),
Stream: resI.GetId().GetStream(),
}
return &responseInitEvent{
ID: sid,
SinceRequestInit: resI.GetSinceRequestInit(),
HTTPStatus: resI.GetHttpStatus(),
}
}
// Attempt to map a `TapEvent_Http_ResponseEnd` event to a `responseEndEvent`
func getResponseEndEvent(pubEv *pb.TapEvent_Http) *responseEndEvent {
resE := pubEv.GetResponseEnd()
if resE == nil {
return nil
}
sid := &streamID{
Base: resE.GetId().GetBase(),
Stream: resE.GetId().GetStream(),
}
return &responseEndEvent{
ID: sid,
SinceRequestInit: resE.GetSinceRequestInit(),
SinceResponseInit: resE.GetSinceResponseInit(),
ResponseBytes: resE.GetResponseBytes(),
GrpcStatusCode: resE.GetEos().GetGrpcStatusCode(),
ResetErrorCode: resE.GetEos().GetResetErrorCode(),
}
}
// src returns the source peer of a `TapEvent`.
func src(event *pb.TapEvent) peer {
return peer{

View File

@ -18,7 +18,7 @@ import (
const targetName = "pod-666"
func busyTest(t *testing.T, wide bool) {
func busyTest(t *testing.T, output string) {
resourceType := k8s.Pod
params := util.TapRequestParams{
Resource: resourceType + "/" + targetName,
@ -40,6 +40,16 @@ func busyTest(t *testing.T, wide bool) {
Id: &pb.TapEvent_Http_StreamId{
Base: 1,
},
Method: &pb.HttpMethod{
Type: &pb.HttpMethod_Registered_{
Registered: pb.HttpMethod_GET,
},
},
Scheme: &pb.Scheme{
Type: &pb.Scheme_Registered_{
Registered: pb.Scheme_HTTPS,
},
},
Authority: params.Authority,
Path: params.Path,
},
@ -92,16 +102,22 @@ func busyTest(t *testing.T, wide bool) {
defer ts.Close()
kubeAPI.Config.Host = ts.URL
options := newTapOptions()
options.output = output
writer := bytes.NewBufferString("")
err = requestTapByResourceFromAPI(writer, kubeAPI, req, wide)
err = requestTapByResourceFromAPI(writer, kubeAPI, req, options)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
var goldenFilePath string
if wide {
switch options.output {
case wideOutput:
goldenFilePath = "testdata/tap_busy_output_wide.golden"
} else {
case jsonOutput:
goldenFilePath = "testdata/tap_busy_output_json.golden"
default:
goldenFilePath = "testdata/tap_busy_output.golden"
}
@ -110,19 +126,23 @@ func busyTest(t *testing.T, wide bool) {
t.Fatalf("Unexpected error: %v", err)
}
expectedContent := string(goldenFileBytes)
output := writer.String()
if expectedContent != output {
t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, output)
actual := writer.String()
if expectedContent != actual {
t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, actual)
}
}
func TestRequestTapByResourceFromAPI(t *testing.T) {
t.Run("Should render busy response if everything went well", func(t *testing.T) {
busyTest(t, false)
busyTest(t, "")
})
t.Run("Should render wide busy response if everything went well", func(t *testing.T) {
busyTest(t, true)
busyTest(t, "wide")
})
t.Run("Should render JSON busy response if everything went well", func(t *testing.T) {
busyTest(t, "json")
})
t.Run("Should render empty response if no events returned", func(t *testing.T) {
@ -150,8 +170,9 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
defer ts.Close()
kubeAPI.Config.Host = ts.URL
options := newTapOptions()
writer := bytes.NewBufferString("")
err = requestTapByResourceFromAPI(writer, kubeAPI, req, false)
err = requestTapByResourceFromAPI(writer, kubeAPI, req, options)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -188,8 +209,9 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
options := newTapOptions()
writer := bytes.NewBufferString("")
err = requestTapByResourceFromAPI(writer, kubeAPI, req, false)
err = requestTapByResourceFromAPI(writer, kubeAPI, req, options)
if err == nil {
t.Fatalf("Expecting error, got nothing but output [%s]", writer.String())
}

View File

@ -1,2 +1,2 @@
req id=1:0 proxy=out src=0.0.0.1:0 dst=0.0.0.9:0 tls=true :method=GET :authority=localhost :path=/some/path
end id=1:0 proxy=out src=0.0.0.1:0 dst=0.0.0.9:0 tls= grpc-status=Code(666) duration=0µs response-length=1337B
req id=1:0 proxy=out src=0.0.0.1:0 dst=[ff01::1]:0 tls=true :method=GET :authority=localhost :path=/some/path
end id=1:0 proxy=out src=0.0.0.1:0 dst=[ff01::1]:0 tls= grpc-status=Code(666) duration=0µs response-length=1337B

View File

@ -0,0 +1,55 @@
{
"source": {
"ip": "0.0.0.1",
"port": 0,
"metadata": null
},
"destination": {
"ip": "ff01::1",
"port": 0,
"metadata": {
"pod": "my-pod",
"tls": "true"
}
},
"routeMeta": null,
"proxyDirection": "OUTBOUND",
"requestInitEvent": {
"id": {
"base": 1,
"stream": 0
},
"method": "GET",
"scheme": "HTTPS",
"authority": "localhost",
"path": "/some/path"
}
}
{
"source": {
"ip": "0.0.0.1",
"port": 0,
"metadata": null
},
"destination": {
"ip": "ff01::1",
"port": 0,
"metadata": null
},
"routeMeta": null,
"proxyDirection": "OUTBOUND",
"responseEndEvent": {
"id": {
"base": 1,
"stream": 0
},
"sinceRequestInit": {
"seconds": 10
},
"sinceResponseInit": {
"seconds": 100
},
"responseBytes": 1337,
"grpcStatusCode": 666
}
}

View File

@ -1,2 +1,2 @@
req id=1:0 proxy=out src=0.0.0.1:0 dst=0.0.0.9:0 tls=true :method=GET :authority=localhost :path=/some/path dst_res=po/my-pod
end id=1:0 proxy=out src=0.0.0.1:0 dst=0.0.0.9:0 tls= grpc-status=Code(666) duration=0µs response-length=1337B
req id=1:0 proxy=out src=0.0.0.1:0 dst=[ff01::1]:0 tls=true :method=GET :authority=localhost :path=/some/path dst_res=po/my-pod
end id=1:0 proxy=out src=0.0.0.1:0 dst=[ff01::1]:0 tls= grpc-status=Code(666) duration=0µs response-length=1337B

View File

@ -1,6 +1,7 @@
package util
import (
"encoding/binary"
"errors"
"fmt"
"strings"
@ -533,8 +534,12 @@ func CreateTapEvent(eventHTTP *pb.TapEvent_Http, dstMeta map[string]string, prox
},
Destination: &pb.TcpAddress{
Ip: &pb.IPAddress{
Ip: &pb.IPAddress_Ipv4{
Ipv4: uint32(9),
Ip: &pb.IPAddress_Ipv6{
Ipv6: &pb.IPv6{
// All nodes address: https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
First: binary.BigEndian.Uint64([]byte{0xff, 0x01, 0, 0, 0, 0, 0, 0}),
Last: binary.BigEndian.Uint64([]byte{0, 0, 0, 0, 0, 0, 0, 0x01}),
},
},
},
},

View File

@ -1,7 +1,9 @@
package addr
import (
"encoding/binary"
"fmt"
"net"
"strconv"
"strings"
@ -14,15 +16,32 @@ import (
const DefaultWeight = 1
// PublicAddressToString formats a Public API TCPAddress as a string.
//
// If Ipv6, the bytes should be ordered big-endian. When formatted as a
// string, the IP address should be enclosed in square brackets followed by
// the port.
func PublicAddressToString(addr *public.TcpAddress) string {
octects := decodeIPToOctets(addr.GetIp().GetIpv4())
return fmt.Sprintf("%d.%d.%d.%d:%d", octects[0], octects[1], octects[2], octects[3], addr.GetPort())
var s string
if addr.GetIp().GetIpv6() != nil {
s = "[%s]:%d"
} else {
s = "%s:%d"
}
return fmt.Sprintf(s, PublicIPToString(addr.GetIp()), addr.GetPort())
}
// PublicIPToString formats a Public API IPAddress as a string.
func PublicIPToString(ip *public.IPAddress) string {
octets := decodeIPToOctets(ip.GetIpv4())
return fmt.Sprintf("%d.%d.%d.%d", octets[0], octets[1], octets[2], octets[3])
var b []byte
if ip.GetIpv6() != nil {
b = make([]byte, 16)
binary.BigEndian.PutUint64(b[:8], ip.GetIpv6().GetFirst())
binary.BigEndian.PutUint64(b[8:], ip.GetIpv6().GetLast())
} else if ip.GetIpv4() != 0 {
b = make([]byte, 4)
binary.BigEndian.PutUint32(b, ip.GetIpv4())
}
return net.IP(b).String()
}
// ProxyAddressToString formats a Proxy API TCPAddress as a string.