linkerd2/viz/tap/api/grpc_server.go

705 lines
20 KiB
Go

package api
import (
"context"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
"unicode/utf8"
"k8s.io/apimachinery/pkg/labels"
httpPb "github.com/linkerd/linkerd2-proxy-api/go/http_types"
proxy "github.com/linkerd/linkerd2-proxy-api/go/tap"
netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/prometheus"
"github.com/linkerd/linkerd2/pkg/util"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizLabels "github.com/linkerd/linkerd2/viz/pkg/labels"
pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
const ipIndex = "ip"
const defaultMaxRps = 100.0
// GRPCTapServer describes the gRPC server implementing pb.TapServer
type GRPCTapServer struct {
tapPb.UnimplementedTapServer
tapPort uint
k8sAPI *k8s.API
controllerNamespace string
trustDomain string
ignoreHeaders map[string]bool
}
var (
tapInterval = 1 * time.Second
)
// Tap is deprecated, use TapByResource.
// This API endpoint is marked as deprecated but it's still used.
//
//nolint:staticcheck
func (s *GRPCTapServer) Tap(req *tapPb.TapRequest, stream tapPb.Tap_TapServer) error {
return status.Error(codes.Unimplemented, "Tap is deprecated, use TapByResource")
}
// TapByResource taps all resources matched by the request object.
func (s *GRPCTapServer) TapByResource(req *tapPb.TapByResourceRequest, stream tapPb.Tap_TapByResourceServer) error {
if req == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest")
}
if req.GetTarget() == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil target ResourceSelection")
}
res := req.GetTarget().GetResource()
labelSelector, err := getLabelSelector(req)
if err != nil {
return err
}
if res == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil target Resource")
}
if req.GetMaxRps() == 0.0 {
req.MaxRps = defaultMaxRps
}
objects, err := s.k8sAPI.GetObjects(res.GetNamespace(), res.GetType(), res.GetName(), labelSelector)
if err != nil {
return pkgUtil.GRPCError(err)
}
pods := []*corev1.Pod{}
tapDisabled := []*corev1.Pod{}
tapNotEnabled := []*corev1.Pod{}
for _, object := range objects {
podsFor, err := s.k8sAPI.GetPodsFor(object, false)
if err != nil {
return pkgUtil.GRPCError(err)
}
for _, pod := range podsFor {
if pkgK8s.IsMeshed(pod, s.controllerNamespace) {
if vizLabels.IsTapDisabled(pod) {
tapDisabled = append(tapDisabled, pod)
} else if !vizLabels.IsTapEnabled(pod) {
tapNotEnabled = append(tapNotEnabled, pod)
} else {
pods = append(pods, pod)
}
}
}
}
if len(pods) == 0 {
var errs strings.Builder
fmt.Fprintf(&errs, "no pods to tap for type=%q name=%q\n", res.GetType(), res.GetName())
if len(tapDisabled) > 0 {
fmt.Fprintf(&errs, "%d pods found with tap disabled via the %s annotation:\n", len(tapDisabled), vizLabels.VizTapDisabled)
for _, pod := range tapDisabled {
fmt.Fprintf(&errs, "\t* %s\n", pod.Name)
}
fmt.Fprintln(&errs, "remove this annotation to make these pods valid tap targets")
}
if len(tapNotEnabled) > 0 {
fmt.Fprintf(&errs, "%d pods found with tap not enabled:\n", len(tapNotEnabled))
for _, pod := range tapNotEnabled {
fmt.Fprintf(&errs, "\t* %s\n", pod.Name)
}
fmt.Fprintln(&errs, "restart these pods to enable tap and make them valid tap targets")
}
return status.Errorf(codes.NotFound, errs.String())
}
log.Infof("Tapping %d pods for target: %q", len(pods), res.String())
events := make(chan *tapPb.TapEvent)
// divide the rps evenly between all pods to tap
rpsPerPod := req.GetMaxRps() / float32(len(pods))
if rpsPerPod < 1 {
rpsPerPod = 1
}
match, err := makeByResourceMatch(req.GetMatch())
if err != nil {
return pkgUtil.GRPCError(err)
}
extract := &proxy.ObserveRequest_Extract{}
// HTTP is the only protocol supported for extracting metadata, so this is
// the only field checked.
extractHTTP := req.GetExtract().GetHttp()
if extractHTTP != nil {
extract = buildExtractHTTP(extractHTTP)
}
for _, pod := range pods {
// create the expected pod identity from the pod spec
ns := res.GetNamespace()
if res.GetType() == pkgK8s.Namespace {
ns = res.GetName()
}
name := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", pod.Spec.ServiceAccountName, ns, s.controllerNamespace, s.trustDomain)
log.Debugf("initiating tap request to %s with required name %s", pod.Spec.ServiceAccountName, name)
// pass the header metadata into the request context
ctx := stream.Context()
ctx = metadata.AppendToOutgoingContext(ctx, pkgK8s.RequireIDHeader, name)
// initiate a tap on the pod
go s.tapProxy(ctx, rpsPerPod, match, extract, pod.Status.PodIP, events)
}
// read events from the taps and send them back
for {
select {
case <-stream.Context().Done():
return nil
case event := <-events:
err := stream.Send(event)
if err != nil {
return pkgUtil.GRPCError(err)
}
}
}
}
func makeByResourceMatch(match *tapPb.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) {
// TODO: for now assume it's always a single, flat `All` match list
seq := match.GetAll()
if seq == nil {
return nil, status.Errorf(codes.Unimplemented, "unexpected match specified: %+v", match)
}
matches := []*proxy.ObserveRequest_Match{}
for _, reqMatch := range seq.Matches {
switch typed := reqMatch.Match.(type) {
case *tapPb.TapByResourceRequest_Match_Destinations:
for k, v := range destinationLabels(typed.Destinations.Resource) {
matches = append(matches, &proxy.ObserveRequest_Match{
Match: &proxy.ObserveRequest_Match_DestinationLabel{
DestinationLabel: &proxy.ObserveRequest_Match_Label{
Key: k,
Value: v,
},
},
})
}
case *tapPb.TapByResourceRequest_Match_Http_:
httpMatch := proxy.ObserveRequest_Match_Http{}
switch httpTyped := typed.Http.Match.(type) {
case *tapPb.TapByResourceRequest_Match_Http_Scheme:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Scheme{
Scheme: util.ParseScheme(httpTyped.Scheme),
},
}
case *tapPb.TapByResourceRequest_Match_Http_Method:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Method{
Method: util.ParseMethod(httpTyped.Method),
},
}
case *tapPb.TapByResourceRequest_Match_Http_Authority:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Authority{
Authority: &proxy.ObserveRequest_Match_Http_StringMatch{
Match: &proxy.ObserveRequest_Match_Http_StringMatch_Exact{
Exact: httpTyped.Authority,
},
},
},
}
case *tapPb.TapByResourceRequest_Match_Http_Path:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Path{
Path: &proxy.ObserveRequest_Match_Http_StringMatch{
Match: &proxy.ObserveRequest_Match_Http_StringMatch_Prefix{
Prefix: httpTyped.Path,
},
},
},
}
default:
return nil, status.Errorf(codes.Unimplemented, "unknown HTTP match type: %v", httpTyped)
}
matches = append(matches, &proxy.ObserveRequest_Match{
Match: &proxy.ObserveRequest_Match_Http_{
Http: &httpMatch,
},
})
default:
return nil, status.Errorf(codes.Unimplemented, "unknown match type: %v", typed)
}
}
return &proxy.ObserveRequest_Match{
Match: &proxy.ObserveRequest_Match_All{
All: &proxy.ObserveRequest_Match_Seq{
Matches: matches,
},
},
}, nil
}
// TODO: factor out with `promLabels` in public-api
func destinationLabels(resource *metricsPb.Resource) map[string]string {
dstLabels := map[string]string{}
if resource.Name != "" {
l5dLabel := pkgK8s.KindToL5DLabel(resource.Type)
dstLabels[l5dLabel] = resource.Name
}
if resource.Type != pkgK8s.Namespace && resource.Namespace != "" {
dstLabels["namespace"] = resource.Namespace
}
return dstLabels
}
func buildExtractHTTP(extract *tapPb.TapByResourceRequest_Extract_Http) *proxy.ObserveRequest_Extract {
if extract.GetHeaders() != nil {
return &proxy.ObserveRequest_Extract{
Extract: &proxy.ObserveRequest_Extract_Http_{
Http: &proxy.ObserveRequest_Extract_Http{
Extract: &proxy.ObserveRequest_Extract_Http_Headers_{
Headers: &proxy.ObserveRequest_Extract_Http_Headers{},
},
},
},
}
}
return nil
}
// Tap a pod.
// This method will run continuously until an error is encountered or the
// request is cancelled via the context. Thus it should be called as a
// go-routine.
// To limit the rps to maxRps, this method calls Observe on the pod with a limit
// of maxRps * 1s at most once per 1s window. If this limit is reached in
// less than 1s, we sleep until the end of the window before calling Observe
// again.
func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, extract *proxy.ObserveRequest_Extract, addr string, events chan *tapPb.TapEvent) {
strPort := strconv.Itoa(int(s.tapPort))
tapAddr := net.JoinHostPort(addr, strPort)
log.Infof("Establishing tap on %s", tapAddr)
conn, err := grpc.NewClient(tapAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
log.Error(err)
return
}
client := proxy.NewTapClient(conn)
defer conn.Close()
req := &proxy.ObserveRequest{
Limit: uint32(maxRps * float32(tapInterval.Seconds())),
Match: match,
Extract: extract,
}
for { // Request loop
windowStart := time.Now()
windowEnd := windowStart.Add(tapInterval)
rsp, err := client.Observe(ctx, req)
if err != nil {
log.Error(err)
return
}
for { // Stream loop
event, err := rsp.Recv()
if err != nil {
if errors.Is(err, io.EOF) {
log.Debugf("[%s] proxy terminated the stream", addr)
break
}
log.Errorf("[%s] encountered an error: %s", addr, err)
return
}
translatedEvent := s.translateEvent(ctx, event)
select {
case <-ctx.Done():
log.Debugf("[%s] client terminated the stream", addr)
return
default:
events <- translatedEvent
}
}
if time.Now().Before(windowEnd) {
time.Sleep(time.Until(windowEnd))
}
}
}
func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent) *tapPb.TapEvent {
direction := func(orig proxy.TapEvent_ProxyDirection) tapPb.TapEvent_ProxyDirection {
switch orig {
case proxy.TapEvent_INBOUND:
return tapPb.TapEvent_INBOUND
case proxy.TapEvent_OUTBOUND:
return tapPb.TapEvent_OUTBOUND
default:
return tapPb.TapEvent_UNKNOWN
}
}
event := func(orig *proxy.TapEvent_Http) *tapPb.TapEvent_Http_ {
id := func(orig *proxy.TapEvent_Http_StreamId) *tapPb.TapEvent_Http_StreamId {
return &tapPb.TapEvent_Http_StreamId{
Base: orig.GetBase(),
Stream: orig.GetStream(),
}
}
method := func(orig *httpPb.HttpMethod) *metricsPb.HttpMethod {
switch m := orig.GetType().(type) {
case *httpPb.HttpMethod_Registered_:
return &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Registered_{
Registered: metricsPb.HttpMethod_Registered(m.Registered),
},
}
case *httpPb.HttpMethod_Unregistered:
return &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Unregistered{
Unregistered: m.Unregistered,
},
}
default:
return nil
}
}
scheme := func(orig *httpPb.Scheme) *metricsPb.Scheme {
switch s := orig.GetType().(type) {
case *httpPb.Scheme_Registered_:
return &metricsPb.Scheme{
Type: &metricsPb.Scheme_Registered_{
Registered: metricsPb.Scheme_Registered(s.Registered),
},
}
case *httpPb.Scheme_Unregistered:
return &metricsPb.Scheme{
Type: &metricsPb.Scheme_Unregistered{
Unregistered: s.Unregistered,
},
}
default:
return nil
}
}
headers := func(orig *httpPb.Headers) *metricsPb.Headers {
if orig == nil {
return nil
}
var headers []*metricsPb.Headers_Header
for _, header := range orig.GetHeaders() {
n := header.GetName()
if s.ignoreHeaders[n] {
continue
}
b := header.GetValue()
h := metricsPb.Headers_Header{Name: n, Value: &metricsPb.Headers_Header_ValueBin{ValueBin: b}}
if utf8.Valid(b) {
h = metricsPb.Headers_Header{Name: n, Value: &metricsPb.Headers_Header_ValueStr{ValueStr: string(b)}}
}
headers = append(headers, &h)
}
return &metricsPb.Headers{
Headers: headers,
}
}
switch orig := orig.GetEvent().(type) {
case *proxy.TapEvent_Http_RequestInit_:
return &tapPb.TapEvent_Http_{
Http: &tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_RequestInit_{
RequestInit: &tapPb.TapEvent_Http_RequestInit{
Id: id(orig.RequestInit.GetId()),
Method: method(orig.RequestInit.GetMethod()),
Scheme: scheme(orig.RequestInit.GetScheme()),
Authority: orig.RequestInit.Authority,
Path: orig.RequestInit.Path,
Headers: headers(orig.RequestInit.GetHeaders()),
},
},
},
}
case *proxy.TapEvent_Http_ResponseInit_:
return &tapPb.TapEvent_Http_{
Http: &tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseInit_{
ResponseInit: &tapPb.TapEvent_Http_ResponseInit{
Id: id(orig.ResponseInit.GetId()),
SinceRequestInit: orig.ResponseInit.GetSinceRequestInit(),
HttpStatus: orig.ResponseInit.GetHttpStatus(),
Headers: headers(orig.ResponseInit.GetHeaders()),
},
},
},
}
case *proxy.TapEvent_Http_ResponseEnd_:
eos := func(orig *proxy.Eos) *metricsPb.Eos {
switch e := orig.GetEnd().(type) {
case *proxy.Eos_ResetErrorCode:
return &metricsPb.Eos{
End: &metricsPb.Eos_ResetErrorCode{
ResetErrorCode: e.ResetErrorCode,
},
}
case *proxy.Eos_GrpcStatusCode:
return &metricsPb.Eos{
End: &metricsPb.Eos_GrpcStatusCode{
GrpcStatusCode: e.GrpcStatusCode,
},
}
default:
return nil
}
}
return &tapPb.TapEvent_Http_{
Http: &tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
Id: id(orig.ResponseEnd.GetId()),
SinceRequestInit: orig.ResponseEnd.GetSinceRequestInit(),
SinceResponseInit: orig.ResponseEnd.GetSinceResponseInit(),
ResponseBytes: orig.ResponseEnd.GetResponseBytes(),
Eos: eos(orig.ResponseEnd.GetEos()),
Trailers: headers(orig.ResponseEnd.GetTrailers()),
},
},
},
}
default:
return nil
}
}
sourceLabels := orig.GetSourceMeta().GetLabels()
if sourceLabels == nil {
sourceLabels = make(map[string]string)
}
destinationLabels := orig.GetDestinationMeta().GetLabels()
if destinationLabels == nil {
destinationLabels = make(map[string]string)
}
ev := &tapPb.TapEvent{
Source: addr.NetToPublic(orig.GetSource()),
SourceMeta: &tapPb.TapEvent_EndpointMeta{
Labels: sourceLabels,
},
Destination: addr.NetToPublic(orig.GetDestination()),
DestinationMeta: &tapPb.TapEvent_EndpointMeta{
Labels: destinationLabels,
},
RouteMeta: &tapPb.TapEvent_RouteMeta{
Labels: orig.GetRouteMeta().GetLabels(),
},
ProxyDirection: direction(orig.GetProxyDirection()),
Event: event(orig.GetHttp()),
}
s.hydrateEventLabels(ctx, ev)
return ev
}
// NewGrpcTapServer creates a new gRPC Tap server
func NewGrpcTapServer(
tapPort uint,
controllerNamespace string,
trustDomain string,
k8sAPI *k8s.API,
ignoreHeaders map[string]bool,
) (*GRPCTapServer, error) {
if err := k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{ipIndex: indexByIP}); err != nil {
return nil, err
}
if err := k8sAPI.Node().Informer().AddIndexers(cache.Indexers{ipIndex: indexByIP}); err != nil {
return nil, err
}
return newGRPCTapServer(tapPort, controllerNamespace, trustDomain, k8sAPI, ignoreHeaders), nil
}
func newGRPCTapServer(
tapPort uint,
controllerNamespace string,
trustDomain string,
k8sAPI *k8s.API,
ignoreHeaders map[string]bool,
) *GRPCTapServer {
srv := &GRPCTapServer{
tapPort: tapPort,
k8sAPI: k8sAPI,
controllerNamespace: controllerNamespace,
trustDomain: trustDomain,
ignoreHeaders: ignoreHeaders,
}
s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
tapPb.RegisterTapServer(s, srv)
return srv
}
func indexByIP(obj interface{}) ([]string, error) {
switch v := obj.(type) {
case *corev1.Pod:
return []string{v.Status.PodIP}, nil
case *corev1.Node:
addresses := make([]string, 0)
for _, address := range v.Status.Addresses {
if address.Type == corev1.NodeInternalIP {
log.Debugf("Indexing node address: %s", address.Address)
addresses = append(addresses, address.Address)
}
}
return addresses, nil
}
return []string{""}, fmt.Errorf("object is not a pod nor a node")
}
// hydrateEventLabels attempts to hydrate the metadata labels for an event's
// source and (if the event was reported by an inbound proxy) destination,
// and adds them to the event's `SourceMeta` and `DestinationMeta` fields.
//
// Since errors encountered while hydrating metadata are non-fatal and result
// only in missing labels, any errors are logged at the WARN level.
func (s *GRPCTapServer) hydrateEventLabels(ctx context.Context, ev *tapPb.TapEvent) {
err := s.hydrateIPLabels(ctx, ev.GetSource().GetIp(), ev.GetSourceMeta().GetLabels())
if err != nil {
log.Warnf("error hydrating source labels: %s", err)
}
if ev.ProxyDirection == tapPb.TapEvent_INBOUND {
// Events emitted by an inbound proxies don't have destination labels,
// since the inbound proxy _is_ the destination, and proxies don't know
// their own labels.
err = s.hydrateIPLabels(ctx, ev.GetDestination().GetIp(), ev.GetDestinationMeta().GetLabels())
if err != nil {
log.Warnf("error hydrating destination labels: %s", err)
}
}
}
// hydrateIPLabels attempts to determine the metadata labels for `ip` and, if
// successful, adds them to `labels`.
func (s *GRPCTapServer) hydrateIPLabels(ctx context.Context, ip *netPb.IPAddress, labels map[string]string) error {
res, err := s.resourceForIP(ip)
if err != nil {
return err
}
switch v := res.(type) {
case *corev1.Pod:
if v == nil {
log.Debugf("no pod found for IP %s", addr.PublicIPToString(ip))
return nil
}
ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(ctx, v, false)
podLabels := pkgK8s.GetPodLabels(ownerKind, ownerName, v)
for key, value := range podLabels {
labels[key] = value
}
labels[pkgK8s.Namespace] = v.Namespace
case *corev1.Node:
labels[pkgK8s.Node] = v.Name
}
return nil
}
// resourceForIP returns the node or pod corresponding to a given IP address.
//
// First it checks if the IP corresponds to a Node's internal IP and returns the
// node if that's the case. Otherwise it checks the running pods that match the
// IP. If exactly one is found, it's returned. Otherwise it returns nil. Errors
// are returned only in the event of an error searching the indices.
func (s *GRPCTapServer) resourceForIP(ip *netPb.IPAddress) (runtime.Object, error) {
ipStr := addr.PublicIPToString(ip)
nodes, err := s.k8sAPI.Node().Informer().GetIndexer().ByIndex(ipIndex, ipStr)
if err != nil {
return nil, err
}
if len(nodes) == 1 {
log.Debugf("found one node at IP %s", ipStr)
return nodes[0].(*corev1.Node), nil
}
pods, err := s.k8sAPI.Pod().Informer().GetIndexer().ByIndex(ipIndex, ipStr)
if err != nil {
return nil, err
}
if len(pods) == 1 {
log.Debugf("found one pod at IP %s", ipStr)
return pods[0].(*corev1.Pod), nil
}
var singleRunningPod *corev1.Pod
for _, obj := range pods {
pod := obj.(*corev1.Pod)
if pod.Status.Phase == corev1.PodRunning {
if singleRunningPod != nil {
log.Warnf(
"could not uniquely identify pod at %s (found %d pods)",
ipStr,
len(pods),
)
return nil, nil
}
singleRunningPod = pod
}
}
return singleRunningPod, nil
}
func getLabelSelector(req *tapPb.TapByResourceRequest) (labels.Selector, error) {
labelSelector := labels.Everything()
if s := req.GetTarget().GetLabelSelector(); s != "" {
var err error
labelSelector, err = labels.Parse(s)
if err != nil {
return nil, fmt.Errorf("invalid label selector \"%s\": %w", s, err)
}
}
return labelSelector, nil
}