package tap import ( "context" "fmt" "io" "time" "unicode/utf8" "k8s.io/apimachinery/pkg/labels" httpPb "github.com/linkerd/linkerd2-proxy-api/go/http_types" proxy "github.com/linkerd/linkerd2-proxy-api/go/tap" apiUtil "github.com/linkerd/linkerd2/controller/api/util" pb "github.com/linkerd/linkerd2/controller/gen/controller/tap" "github.com/linkerd/linkerd2/controller/gen/public" "github.com/linkerd/linkerd2/controller/k8s" "github.com/linkerd/linkerd2/pkg/addr" pkgK8s "github.com/linkerd/linkerd2/pkg/k8s" "github.com/linkerd/linkerd2/pkg/prometheus" "github.com/linkerd/linkerd2/pkg/util" log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/tools/cache" ) const requireIDHeader = "l5d-require-id" const ipIndex = "ip" const defaultMaxRps = 100.0 // GRPCTapServer describes the gRPC server implementing pb.TapServer type GRPCTapServer struct { tapPort uint k8sAPI *k8s.API controllerNamespace string trustDomain string } var ( tapInterval = 1 * time.Second ) // Tap is deprecated, use TapByResource. func (s *GRPCTapServer) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { return status.Error(codes.Unimplemented, "Tap is deprecated, use TapByResource") } // TapByResource taps all resources matched by the request object. func (s *GRPCTapServer) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_TapByResourceServer) error { if req == nil { return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest") } if req.GetTarget() == nil { return status.Error(codes.InvalidArgument, "TapByResource received nil target ResourceSelection") } res := req.GetTarget().GetResource() labelSelector, err := getLabelSelector(req) if err != nil { return err } if res == nil { return status.Error(codes.InvalidArgument, "TapByResource received nil target Resource") } if req.GetMaxRps() == 0.0 { req.MaxRps = defaultMaxRps } objects, err := s.k8sAPI.GetObjects(res.GetNamespace(), res.GetType(), res.GetName(), labelSelector) if err != nil { return apiUtil.GRPCError(err) } pods := []*corev1.Pod{} foundDisabledPods := false for _, object := range objects { podsFor, err := s.k8sAPI.GetPodsFor(object, false) if err != nil { return apiUtil.GRPCError(err) } for _, pod := range podsFor { if pkgK8s.IsMeshed(pod, s.controllerNamespace) { if pkgK8s.IsTapDisabled(pod) { foundDisabledPods = true } else { pods = append(pods, pod) } } } } if len(pods) == 0 { resType := res.GetType() resName := res.GetName() if foundDisabledPods { return status.Errorf(codes.NotFound, "all pods found for %s/%s have tapping disabled", resType, resName) } return status.Errorf(codes.NotFound, "no pods found for %s/%s", resType, resName) } log.Infof("Tapping %d pods for target: %+v", len(pods), *res) events := make(chan *public.TapEvent) // divide the rps evenly between all pods to tap rpsPerPod := req.GetMaxRps() / float32(len(pods)) if rpsPerPod < 1 { rpsPerPod = 1 } match, err := makeByResourceMatch(req.GetMatch()) if err != nil { return apiUtil.GRPCError(err) } extract := &proxy.ObserveRequest_Extract{} // HTTP is the only protocol supported for extracting metadata, so this is // the only field checked. extractHTTP := req.GetExtract().GetHttp() if extractHTTP != nil { extract = buildExtractHTTP(extractHTTP) } for _, pod := range pods { // create the expected pod identity from the pod spec ns := res.GetNamespace() if res.GetType() == pkgK8s.Namespace { ns = res.GetName() } name := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", pod.Spec.ServiceAccountName, ns, s.controllerNamespace, s.trustDomain) log.Debugf("initiating tap request to %s with required name %s", pod.Spec.ServiceAccountName, name) // pass the header metadata into the request context ctx := stream.Context() ctx = metadata.AppendToOutgoingContext(ctx, requireIDHeader, name) // initiate a tap on the pod go s.tapProxy(ctx, rpsPerPod, match, extract, pod.Status.PodIP, events) } // read events from the taps and send them back for { select { case <-stream.Context().Done(): return nil case event := <-events: err := stream.Send(event) if err != nil { return apiUtil.GRPCError(err) } } } } func makeByResourceMatch(match *public.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) { // TODO: for now assume it's always a single, flat `All` match list seq := match.GetAll() if seq == nil { return nil, status.Errorf(codes.Unimplemented, "unexpected match specified: %+v", match) } matches := []*proxy.ObserveRequest_Match{} for _, reqMatch := range seq.Matches { switch typed := reqMatch.Match.(type) { case *public.TapByResourceRequest_Match_Destinations: for k, v := range destinationLabels(typed.Destinations.Resource) { matches = append(matches, &proxy.ObserveRequest_Match{ Match: &proxy.ObserveRequest_Match_DestinationLabel{ DestinationLabel: &proxy.ObserveRequest_Match_Label{ Key: k, Value: v, }, }, }) } case *public.TapByResourceRequest_Match_Http_: httpMatch := proxy.ObserveRequest_Match_Http{} switch httpTyped := typed.Http.Match.(type) { case *public.TapByResourceRequest_Match_Http_Scheme: httpMatch = proxy.ObserveRequest_Match_Http{ Match: &proxy.ObserveRequest_Match_Http_Scheme{ Scheme: util.ParseScheme(httpTyped.Scheme), }, } case *public.TapByResourceRequest_Match_Http_Method: httpMatch = proxy.ObserveRequest_Match_Http{ Match: &proxy.ObserveRequest_Match_Http_Method{ Method: util.ParseMethod(httpTyped.Method), }, } case *public.TapByResourceRequest_Match_Http_Authority: httpMatch = proxy.ObserveRequest_Match_Http{ Match: &proxy.ObserveRequest_Match_Http_Authority{ Authority: &proxy.ObserveRequest_Match_Http_StringMatch{ Match: &proxy.ObserveRequest_Match_Http_StringMatch_Exact{ Exact: httpTyped.Authority, }, }, }, } case *public.TapByResourceRequest_Match_Http_Path: httpMatch = proxy.ObserveRequest_Match_Http{ Match: &proxy.ObserveRequest_Match_Http_Path{ Path: &proxy.ObserveRequest_Match_Http_StringMatch{ Match: &proxy.ObserveRequest_Match_Http_StringMatch_Prefix{ Prefix: httpTyped.Path, }, }, }, } default: return nil, status.Errorf(codes.Unimplemented, "unknown HTTP match type: %v", httpTyped) } matches = append(matches, &proxy.ObserveRequest_Match{ Match: &proxy.ObserveRequest_Match_Http_{ Http: &httpMatch, }, }) default: return nil, status.Errorf(codes.Unimplemented, "unknown match type: %v", typed) } } return &proxy.ObserveRequest_Match{ Match: &proxy.ObserveRequest_Match_All{ All: &proxy.ObserveRequest_Match_Seq{ Matches: matches, }, }, }, nil } // TODO: factor out with `promLabels` in public-api func destinationLabels(resource *public.Resource) map[string]string { dstLabels := map[string]string{} if resource.Name != "" { l5dLabel := pkgK8s.KindToL5DLabel(resource.Type) dstLabels[l5dLabel] = resource.Name } if resource.Type != pkgK8s.Namespace && resource.Namespace != "" { dstLabels["namespace"] = resource.Namespace } return dstLabels } func buildExtractHTTP(extract *public.TapByResourceRequest_Extract_Http) *proxy.ObserveRequest_Extract { if extract.GetHeaders() != nil { return &proxy.ObserveRequest_Extract{ Extract: &proxy.ObserveRequest_Extract_Http_{ Http: &proxy.ObserveRequest_Extract_Http{ Extract: &proxy.ObserveRequest_Extract_Http_Headers_{ Headers: &proxy.ObserveRequest_Extract_Http_Headers{}, }, }, }, } } return nil } // Tap a pod. // This method will run continuously until an error is encountered or the // request is cancelled via the context. Thus it should be called as a // go-routine. // To limit the rps to maxRps, this method calls Observe on the pod with a limit // of maxRps * 1s at most once per 1s window. If this limit is reached in // less than 1s, we sleep until the end of the window before calling Observe // again. func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, extract *proxy.ObserveRequest_Extract, addr string, events chan *public.TapEvent) { tapAddr := fmt.Sprintf("%s:%d", addr, s.tapPort) log.Infof("Establishing tap on %s", tapAddr) conn, err := grpc.DialContext(ctx, tapAddr, grpc.WithInsecure()) if err != nil { log.Error(err) return } client := proxy.NewTapClient(conn) defer conn.Close() req := &proxy.ObserveRequest{ Limit: uint32(maxRps * float32(tapInterval.Seconds())), Match: match, Extract: extract, } for { // Request loop windowStart := time.Now() windowEnd := windowStart.Add(tapInterval) rsp, err := client.Observe(ctx, req) if err != nil { log.Error(err) return } for { // Stream loop event, err := rsp.Recv() if err == io.EOF { log.Debugf("[%s] proxy terminated the stream", addr) break } if err != nil { log.Errorf("[%s] encountered an error: %s", addr, err) return } translatedEvent := s.translateEvent(event) select { case <-ctx.Done(): log.Debugf("[%s] client terminated the stream", addr) return default: events <- translatedEvent } } if time.Now().Before(windowEnd) { time.Sleep(time.Until(windowEnd)) } } } func (s *GRPCTapServer) translateEvent(orig *proxy.TapEvent) *public.TapEvent { direction := func(orig proxy.TapEvent_ProxyDirection) public.TapEvent_ProxyDirection { switch orig { case proxy.TapEvent_INBOUND: return public.TapEvent_INBOUND case proxy.TapEvent_OUTBOUND: return public.TapEvent_OUTBOUND default: return public.TapEvent_UNKNOWN } } event := func(orig *proxy.TapEvent_Http) *public.TapEvent_Http_ { id := func(orig *proxy.TapEvent_Http_StreamId) *public.TapEvent_Http_StreamId { return &public.TapEvent_Http_StreamId{ Base: orig.GetBase(), Stream: orig.GetStream(), } } method := func(orig *httpPb.HttpMethod) *public.HttpMethod { switch m := orig.GetType().(type) { case *httpPb.HttpMethod_Registered_: return &public.HttpMethod{ Type: &public.HttpMethod_Registered_{ Registered: public.HttpMethod_Registered(m.Registered), }, } case *httpPb.HttpMethod_Unregistered: return &public.HttpMethod{ Type: &public.HttpMethod_Unregistered{ Unregistered: m.Unregistered, }, } default: return nil } } scheme := func(orig *httpPb.Scheme) *public.Scheme { switch s := orig.GetType().(type) { case *httpPb.Scheme_Registered_: return &public.Scheme{ Type: &public.Scheme_Registered_{ Registered: public.Scheme_Registered(s.Registered), }, } case *httpPb.Scheme_Unregistered: return &public.Scheme{ Type: &public.Scheme_Unregistered{ Unregistered: s.Unregistered, }, } default: return nil } } headers := func(orig *httpPb.Headers) *public.Headers { if orig == nil { return nil } var headers []*public.Headers_Header for _, header := range orig.GetHeaders() { n := header.GetName() b := header.GetValue() h := public.Headers_Header{Name: n, Value: &public.Headers_Header_ValueBin{ValueBin: b}} if utf8.Valid(b) { h = public.Headers_Header{Name: n, Value: &public.Headers_Header_ValueStr{ValueStr: string(b)}} } headers = append(headers, &h) } return &public.Headers{ Headers: headers, } } switch orig := orig.GetEvent().(type) { case *proxy.TapEvent_Http_RequestInit_: return &public.TapEvent_Http_{ Http: &public.TapEvent_Http{ Event: &public.TapEvent_Http_RequestInit_{ RequestInit: &public.TapEvent_Http_RequestInit{ Id: id(orig.RequestInit.GetId()), Method: method(orig.RequestInit.GetMethod()), Scheme: scheme(orig.RequestInit.GetScheme()), Authority: orig.RequestInit.Authority, Path: orig.RequestInit.Path, Headers: headers(orig.RequestInit.GetHeaders()), }, }, }, } case *proxy.TapEvent_Http_ResponseInit_: return &public.TapEvent_Http_{ Http: &public.TapEvent_Http{ Event: &public.TapEvent_Http_ResponseInit_{ ResponseInit: &public.TapEvent_Http_ResponseInit{ Id: id(orig.ResponseInit.GetId()), SinceRequestInit: orig.ResponseInit.GetSinceRequestInit(), HttpStatus: orig.ResponseInit.GetHttpStatus(), Headers: headers(orig.ResponseInit.GetHeaders()), }, }, }, } case *proxy.TapEvent_Http_ResponseEnd_: eos := func(orig *proxy.Eos) *public.Eos { switch e := orig.GetEnd().(type) { case *proxy.Eos_ResetErrorCode: return &public.Eos{ End: &public.Eos_ResetErrorCode{ ResetErrorCode: e.ResetErrorCode, }, } case *proxy.Eos_GrpcStatusCode: return &public.Eos{ End: &public.Eos_GrpcStatusCode{ GrpcStatusCode: e.GrpcStatusCode, }, } default: return nil } } return &public.TapEvent_Http_{ Http: &public.TapEvent_Http{ Event: &public.TapEvent_Http_ResponseEnd_{ ResponseEnd: &public.TapEvent_Http_ResponseEnd{ Id: id(orig.ResponseEnd.GetId()), SinceRequestInit: orig.ResponseEnd.GetSinceRequestInit(), SinceResponseInit: orig.ResponseEnd.GetSinceResponseInit(), ResponseBytes: orig.ResponseEnd.GetResponseBytes(), Eos: eos(orig.ResponseEnd.GetEos()), Trailers: headers(orig.ResponseEnd.GetTrailers()), }, }, }, } default: return nil } } sourceLabels := orig.GetSourceMeta().GetLabels() if sourceLabels == nil { sourceLabels = make(map[string]string) } destinationLabels := orig.GetDestinationMeta().GetLabels() if destinationLabels == nil { destinationLabels = make(map[string]string) } ev := &public.TapEvent{ Source: addr.NetToPublic(orig.GetSource()), SourceMeta: &public.TapEvent_EndpointMeta{ Labels: sourceLabels, }, Destination: addr.NetToPublic(orig.GetDestination()), DestinationMeta: &public.TapEvent_EndpointMeta{ Labels: destinationLabels, }, RouteMeta: &public.TapEvent_RouteMeta{ Labels: orig.GetRouteMeta().GetLabels(), }, ProxyDirection: direction(orig.GetProxyDirection()), Event: event(orig.GetHttp()), } s.hydrateEventLabels(ev) return ev } // NewGrpcTapServer creates a new gRPC Tap server func NewGrpcTapServer( tapPort uint, controllerNamespace string, trustDomain string, k8sAPI *k8s.API, ) *GRPCTapServer { k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{ipIndex: indexByIP}) k8sAPI.Node().Informer().AddIndexers(cache.Indexers{ipIndex: indexByIP}) return newGRPCTapServer(tapPort, controllerNamespace, trustDomain, k8sAPI) } func newGRPCTapServer( tapPort uint, controllerNamespace string, trustDomain string, k8sAPI *k8s.API, ) *GRPCTapServer { srv := &GRPCTapServer{ tapPort: tapPort, k8sAPI: k8sAPI, controllerNamespace: controllerNamespace, trustDomain: trustDomain, } s := prometheus.NewGrpcServer() pb.RegisterTapServer(s, srv) return srv } func indexByIP(obj interface{}) ([]string, error) { switch v := obj.(type) { case *corev1.Pod: return []string{v.Status.PodIP}, nil case *corev1.Node: addresses := make([]string, 0) for _, address := range v.Status.Addresses { if address.Type == corev1.NodeInternalIP { log.Debugf("Indexing node address: %s", address.Address) addresses = append(addresses, address.Address) } } return addresses, nil } return []string{""}, fmt.Errorf("object is not a pod nor a node") } // hydrateEventLabels attempts to hydrate the metadata labels for an event's // source and (if the event was reported by an inbound proxy) destination, // and adds them to the event's `SourceMeta` and `DestinationMeta` fields. // // Since errors encountered while hydrating metadata are non-fatal and result // only in missing labels, any errors are logged at the WARN level. func (s *GRPCTapServer) hydrateEventLabels(ev *public.TapEvent) { err := s.hydrateIPLabels(ev.GetSource().GetIp(), ev.GetSourceMeta().GetLabels()) if err != nil { log.Warnf("error hydrating source labels: %s", err) } if ev.ProxyDirection == public.TapEvent_INBOUND { // Events emitted by an inbound proxies don't have destination labels, // since the inbound proxy _is_ the destination, and proxies don't know // their own labels. err = s.hydrateIPLabels(ev.GetDestination().GetIp(), ev.GetDestinationMeta().GetLabels()) if err != nil { log.Warnf("error hydrating destination labels: %s", err) } } } // hydrateIPMeta attempts to determine the metadata labels for `ip` and, if // successful, adds them to `labels`. func (s *GRPCTapServer) hydrateIPLabels(ip *public.IPAddress, labels map[string]string) error { res, err := s.resourceForIP(ip) if err != nil { return err } switch v := res.(type) { case *corev1.Pod: if v == nil { log.Debugf("no pod found for IP %s", addr.PublicIPToString(ip)) return nil } ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(v, false) podLabels := pkgK8s.GetPodLabels(ownerKind, ownerName, v) for key, value := range podLabels { labels[key] = value } labels[pkgK8s.Namespace] = v.Namespace case *corev1.Node: labels[pkgK8s.Node] = v.Name } return nil } // resourceForIP returns the node or pod corresponding to a given IP address. // // First it checks if the IP corresponds to a Node's internal IP and returns the // node if that's the case. Otherwise it checks the running pods that match the // IP. If exactly one is found, it's returned. Otherwise it returns nil. Errors // are returned only in the event of an error searching the indices. func (s *GRPCTapServer) resourceForIP(ip *public.IPAddress) (runtime.Object, error) { ipStr := addr.PublicIPToString(ip) nodes, err := s.k8sAPI.Node().Informer().GetIndexer().ByIndex(ipIndex, ipStr) if err != nil { return nil, err } if len(nodes) == 1 { log.Debugf("found one node at IP %s", ipStr) return nodes[0].(*corev1.Node), nil } pods, err := s.k8sAPI.Pod().Informer().GetIndexer().ByIndex(ipIndex, ipStr) if err != nil { return nil, err } if len(pods) == 1 { log.Debugf("found one pod at IP %s", ipStr) return pods[0].(*corev1.Pod), nil } var singleRunningPod *corev1.Pod for _, obj := range pods { pod := obj.(*corev1.Pod) if pod.Status.Phase == corev1.PodRunning { if singleRunningPod != nil { log.Warnf( "could not uniquely identify pod at %s (found %d pods)", ipStr, len(pods), ) return nil, nil } singleRunningPod = pod } } return singleRunningPod, nil } func getLabelSelector(req *public.TapByResourceRequest) (labels.Selector, error) { labelSelector := labels.Everything() if s := req.GetTarget().GetLabelSelector(); s != "" { var err error labelSelector, err = labels.Parse(s) if err != nil { return nil, fmt.Errorf("invalid label selector \"%s\": %s", s, err) } } return labelSelector, nil }