Move tap from core into Viz extension (#5651)

Closes #5545.

This change moves all tap and tap-injector code into the viz directory. 

The tap and tap-injector components now also use a new tap image—separating
these components from the controller image that they are currently part of. This
means the controller image has removed all its build dependencies related to
tap.

Finally, the tap Protobuf has been separated from the metrics-api and moved into
it's own `.proto` file and gen directory. This introduces a clear split between
metrics-api and tap Protobuf.

There is no change in behavior for the `viz tap` command.

### Reviewing

#### Docker images

All the bin directory scripts should be updated to build and load the tap image.
All the CI workflows should be updated to build and push the tap image.

#### Controller and pkg directories

This is primarily deletions. Most of the deleted code in this directory is now
in the tap directory of the Viz extension.

#### viz/tap

This is the location that all the tap related code now lives in. New files are
mostly moved from the controller and pkg directories. Imports have all been
updated to point at the right locations and Protobuf.

The Protobuf here is taken from metrics-api and contains all tap-related
Protobuf.

Signed-off-by: Kevin Leimkuhler <kevin@kleimkuhler.com>
This commit is contained in:
Kevin Leimkuhler 2021-02-09 12:43:21 -05:00 committed by GitHub
parent 3542269e78
commit 75fcc9d623
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
56 changed files with 3690 additions and 3486 deletions

View File

@ -16,7 +16,7 @@ jobs:
strategy:
matrix:
# Keep in sync with release.yaml matrix build
target: [proxy, controller, metrics-api, web, cni-plugin, debug, cli-bin, grafana, jaeger-webhook]
target: [proxy, controller, metrics-api, web, cni-plugin, debug, cli-bin, grafana, jaeger-webhook, tap]
name: Docker build (${{ matrix.target }})
timeout-minutes: 30
steps:

View File

@ -12,7 +12,7 @@ jobs:
strategy:
matrix:
# Keep in sync with integration_tests.yaml matrix build
target: [proxy, controller, metrics-api, web, cni-plugin, debug, cli-bin, grafana, jaeger-webhook]
target: [proxy, controller, metrics-api, web, cni-plugin, debug, cli-bin, grafana, jaeger-webhook, tap]
name: Docker build (${{ matrix.target }})
timeout-minutes: 30
steps:

View File

@ -22,4 +22,4 @@ fi
"$bindir"/docker-build-grafana
"$bindir"/docker-build-jaeger-webhook
"$bindir"/docker-build-metrics-api
"$bindir"/docker-build-tap

20
bin/docker-build-tap Executable file
View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
set -eu
if [ $# -ne 0 ]; then
echo "no arguments allowed for ${0##*/}, given: $*" >&2
exit 64
fi
bindir=$( cd "${BASH_SOURCE[0]%/*}" && pwd )
rootdir=$( cd "$bindir"/.. && pwd )
# shellcheck source=_docker.sh
. "$bindir"/_docker.sh
# shellcheck source=_tag.sh
. "$bindir"/_tag.sh
dockerfile=$rootdir/viz/tap/Dockerfile
tag=$(head_root_tag)
docker_build tap "$tag" "$dockerfile"

View File

@ -14,6 +14,6 @@ bindir=$( cd "${BASH_SOURCE[0]%/*}" && pwd )
# shellcheck source=_docker.sh
. "$bindir"/_docker.sh
for img in cli-bin cni-plugin controller metrics-api debug grafana proxy web ; do
for img in cli-bin cni-plugin controller metrics-api debug grafana proxy web jaeger-webhook tap; do
docker_push $img "$tag"
done

View File

@ -10,7 +10,7 @@ while :
do
case $1 in
-h|--help)
echo "Load into KinD/k3d the images for Linkerd's proxy, controller, metrics-api, web, grafana, cli-bin, debug and cni-plugin."
echo "Load into KinD/k3d the images for Linkerd's proxy, controller, metrics-api, web, grafana, cli-bin, tap, debug and cni-plugin."
echo ""
echo "Usage:"
echo " bin/image-load [--kind] [--k3d] [--archive]"
@ -79,7 +79,7 @@ fi
"$bin" version
rm -f load_fail
for img in proxy controller web metrics-api grafana cli-bin debug cni-plugin jaeger-webhook; do
for img in proxy controller web metrics-api grafana cli-bin debug cni-plugin jaeger-webhook tap; do
printf 'Importing %s...\n' $img
if [ $archive ]; then
param="image-archives/$img.tar"

View File

@ -6,15 +6,16 @@ bindir=$( cd "${0%/*}" && pwd )
go install -mod=readonly github.com/golang/protobuf/protoc-gen-go
rm -rf controller/gen/common controller/gen/public controller/gen/config viz/metrics-api/gen
mkdir -p controller/gen/common/net controller/gen/public viz/metrics-api/gen/viz
rm -rf controller/gen/common controller/gen/public controller/gen/config viz/metrics-api/gen viz/tap/gen
mkdir -p controller/gen/common/net controller/gen/public viz/metrics-api/gen/viz viz/tap/gen/tap
"$bindir"/protoc -I proto --go_out=plugins=grpc,paths=source_relative:controller/gen proto/common/net.proto
"$bindir"/protoc -I proto --go_out=plugins=grpc,paths=source_relative:controller/gen proto/public.proto
"$bindir"/protoc -I proto --go_out=plugins=grpc,paths=source_relative:controller/gen proto/config/config.proto
"$bindir"/protoc -I proto -I viz/metrics-api/proto --go_out=plugins=grpc,paths=source_relative:viz/metrics-api/gen viz/metrics-api/proto/viz.proto
"$bindir"/protoc -I proto -I viz/tap/proto -I viz/metrics-api/proto --go_out=plugins=grpc,paths=source_relative:viz/tap/gen viz/tap/proto/viz_tap.proto
mv controller/gen/common/net.pb.go controller/gen/common/net/
mv controller/gen/public.pb.go controller/gen/public/
mv viz/metrics-api/gen/viz.pb.go viz/metrics-api/gen/viz/viz.pb.go
mv viz/tap/gen/viz_tap.pb.go viz/tap/gen/tap/viz_tap.pb.go

View File

@ -124,7 +124,7 @@ Kubernetes: `>=1.13.0-0`
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| controllerImage | string | `"ghcr.io/linkerd/controller"` | Docker image for the controller, tap and identity components |
| controllerImage | string | `"ghcr.io/linkerd/controller"` | Docker image for the controller and identity components |
| controllerReplicas | int | `1` | Number of replicas for each control plane pod |
| controllerUID | int | `2103` | User ID for the control plane components |
| debugContainer.image.name | string | `"ghcr.io/linkerd/debug"` | Docker image for the debug container |

View File

@ -174,7 +174,7 @@ omitWebhookSideEffects: false
webhookFailurePolicy: Ignore
# controllerImage -- Docker image for the controller, tap and identity
# controllerImage -- Docker image for the controller and identity
# components
controllerImage: ghcr.io/linkerd/controller
# -- Number of replicas for each control plane pod

View File

@ -13,10 +13,9 @@ RUN ./bin/install-deps $TARGETARCH
FROM go-deps as golang
WORKDIR /linkerd-build
COPY controller/gen controller/gen
# TODO: remove when tap code gets moved to /viz
# TODO: remove when BuildResource is refactored
# https://github.com/linkerd/linkerd2/issues/5589
COPY viz/metrics-api/gen/viz viz/metrics-api/gen/viz
# TODO: remove when tap code gets moved to /viz
COPY viz/pkg/labels viz/pkg/labels
COPY pkg pkg
COPY controller controller
COPY charts/patch charts/patch

View File

@ -1,12 +1,9 @@
package util
import (
"encoding/binary"
"errors"
"fmt"
"strings"
netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
"github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"google.golang.org/grpc/codes"
@ -19,56 +16,6 @@ import (
Shared utilities for interacting with the controller public api
*/
var (
// ValidTargets specifies resource types allowed as a target:
// target resource on an inbound query
// target resource on an outbound 'to' query
// destination resource on an outbound 'from' query
ValidTargets = []string{
k8s.Authority,
k8s.CronJob,
k8s.DaemonSet,
k8s.Deployment,
k8s.Job,
k8s.Namespace,
k8s.Pod,
k8s.ReplicaSet,
k8s.ReplicationController,
k8s.StatefulSet,
}
// ValidTapDestinations specifies resource types allowed as a tap destination:
// destination resource on an outbound 'to' query
ValidTapDestinations = []string{
k8s.CronJob,
k8s.DaemonSet,
k8s.Deployment,
k8s.Job,
k8s.Namespace,
k8s.Pod,
k8s.ReplicaSet,
k8s.ReplicationController,
k8s.Service,
k8s.StatefulSet,
}
)
// TapRequestParams contains parameters that are used to build a
// TapByResourceRequest.
type TapRequestParams struct {
Resource string
Namespace string
ToResource string
ToNamespace string
MaxRps float32
Scheme string
Method string
Authority string
Path string
Extract bool
LabelSelector string
}
// GRPCError generates a gRPC error code, as defined in
// google.golang.org/grpc/status.
// If the error is nil or already a gRPC error, return the error.
@ -198,143 +145,3 @@ func buildResource(namespace string, resType string, name string) (*pb.Resource,
Name: name,
}, nil
}
// BuildTapByResourceRequest builds a Public API TapByResourceRequest from a
// TapRequestParams.
func BuildTapByResourceRequest(params TapRequestParams) (*pb.TapByResourceRequest, error) {
target, err := BuildResource(params.Namespace, params.Resource)
if err != nil {
return nil, fmt.Errorf("target resource invalid: %s", err)
}
if !contains(ValidTargets, target.Type) {
return nil, fmt.Errorf("unsupported resource type [%s]", target.Type)
}
matches := []*pb.TapByResourceRequest_Match{}
if params.ToResource != "" {
destination, err := BuildResource(params.ToNamespace, params.ToResource)
if err != nil {
return nil, fmt.Errorf("destination resource invalid: %s", err)
}
if !contains(ValidTapDestinations, destination.Type) {
return nil, fmt.Errorf("unsupported resource type [%s]", destination.Type)
}
match := pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_Destinations{
Destinations: &pb.ResourceSelection{
Resource: destination,
},
},
}
matches = append(matches, &match)
}
if params.Scheme != "" {
match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{
Match: &pb.TapByResourceRequest_Match_Http_Scheme{Scheme: params.Scheme},
})
matches = append(matches, &match)
}
if params.Method != "" {
match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{
Match: &pb.TapByResourceRequest_Match_Http_Method{Method: params.Method},
})
matches = append(matches, &match)
}
if params.Authority != "" {
match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{
Match: &pb.TapByResourceRequest_Match_Http_Authority{Authority: params.Authority},
})
matches = append(matches, &match)
}
if params.Path != "" {
match := buildMatchHTTP(&pb.TapByResourceRequest_Match_Http{
Match: &pb.TapByResourceRequest_Match_Http_Path{Path: params.Path},
})
matches = append(matches, &match)
}
extract := &pb.TapByResourceRequest_Extract{}
if params.Extract {
extract = buildExtractHTTP(&pb.TapByResourceRequest_Extract_Http{
Extract: &pb.TapByResourceRequest_Extract_Http_Headers_{
Headers: &pb.TapByResourceRequest_Extract_Http_Headers{},
},
})
}
return &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: target,
LabelSelector: params.LabelSelector,
},
MaxRps: params.MaxRps,
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_All{
All: &pb.TapByResourceRequest_Match_Seq{
Matches: matches,
},
},
},
Extract: extract,
}, nil
}
func buildMatchHTTP(match *pb.TapByResourceRequest_Match_Http) pb.TapByResourceRequest_Match {
return pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_Http_{
Http: match,
},
}
}
func buildExtractHTTP(extract *pb.TapByResourceRequest_Extract_Http) *pb.TapByResourceRequest_Extract {
return &pb.TapByResourceRequest_Extract{
Extract: &pb.TapByResourceRequest_Extract_Http_{
Http: extract,
},
}
}
func contains(list []string, s string) bool {
for _, elem := range list {
if s == elem {
return true
}
}
return false
}
// CreateTapEvent generates tap events for use in tests
func CreateTapEvent(eventHTTP *pb.TapEvent_Http, dstMeta map[string]string, proxyDirection pb.TapEvent_ProxyDirection) *pb.TapEvent {
event := &pb.TapEvent{
ProxyDirection: proxyDirection,
Source: &netPb.TcpAddress{
Ip: &netPb.IPAddress{
Ip: &netPb.IPAddress_Ipv4{
Ipv4: uint32(1),
},
},
},
Destination: &netPb.TcpAddress{
Ip: &netPb.IPAddress{
Ip: &netPb.IPAddress_Ipv6{
Ipv6: &netPb.IPv6{
// All nodes address: https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
First: binary.BigEndian.Uint64([]byte{0xff, 0x01, 0, 0, 0, 0, 0, 0}),
Last: binary.BigEndian.Uint64([]byte{0, 0, 0, 0, 0, 0, 0, 0x01}),
},
},
},
},
Event: &pb.TapEvent_Http_{
Http: eventHTTP,
},
DestinationMeta: &pb.TapEvent_EndpointMeta{
Labels: dstMeta,
},
}
return event
}

View File

@ -10,8 +10,6 @@ import (
proxyinjector "github.com/linkerd/linkerd2/controller/cmd/proxy-injector"
publicapi "github.com/linkerd/linkerd2/controller/cmd/public-api"
spvalidator "github.com/linkerd/linkerd2/controller/cmd/sp-validator"
"github.com/linkerd/linkerd2/controller/cmd/tap"
tapinjector "github.com/linkerd/linkerd2/controller/cmd/tap-injector"
servicemirror "github.com/linkerd/linkerd2/multicluster/cmd/service-mirror"
)
@ -34,10 +32,6 @@ func main() {
publicapi.Main(os.Args[2:])
case "sp-validator":
spvalidator.Main(os.Args[2:])
case "tap":
tap.Main(os.Args[2:])
case "tap-injector":
tapinjector.Main(os.Args[2:])
case "service-mirror":
servicemirror.Main(os.Args[2:])
default:

View File

@ -37,7 +37,6 @@ func confNsDisabled() *inject.ResourceConfig {
func TestGetPatch(t *testing.T) {
values.GetGlobal().Proxy.DisableIdentity = true
values.GetGlobal().Proxy.DisableTap = true
factory := fake.NewFactory(filepath.Join("fake", "data"))
nsEnabled, err := factory.Namespace("namespace-inject-enabled.yaml")

View File

@ -1,6 +0,0 @@
package inject
const (
// TapSvcEnvKey is the environment key that the proxy reads to know the tap svc name
TapSvcEnvKey = "LINKERD2_PROXY_TAP_SVC_NAME"
)

View File

@ -11,7 +11,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/status"
kerrors "k8s.io/apimachinery/pkg/api/errors"
@ -32,7 +32,7 @@ type HTTPError struct {
}
// FlushableResponseWriter wraps a ResponseWriter for use in streaming
// responses, such as Tap.
// responses
type FlushableResponseWriter interface {
http.ResponseWriter
http.Flusher
@ -81,7 +81,7 @@ func WriteErrorToHTTPResponse(w http.ResponseWriter, errorObtained error) {
errorMessageToReturn = grpcError.Message()
}
errorAsProto := &pb.ApiError{Error: errorMessageToReturn}
errorAsProto := &metricsPb.ApiError{Error: errorMessageToReturn}
err := WriteProtoToHTTPResponse(w, errorAsProto)
if err != nil {
@ -154,7 +154,7 @@ func CheckIfResponseHasError(rsp *http.Response) error {
errorMsg := rsp.Header.Get(errorHeader)
if errorMsg != "" {
reader := bufio.NewReader(rsp.Body)
var apiError pb.ApiError
var apiError metricsPb.ApiError
err := FromByteStreamToProtocolBuffers(reader, &apiError)
if err != nil {
@ -200,25 +200,3 @@ func FromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader,
return nil
}
// TapReqToURL converts a TapByResourceRequest protobuf object to a URL for use
// with the Kubernetes tap.linkerd.io APIService.
// TODO: Move this, probably into its own package, when /controller/gen/public
// moves into /pkg.
func TapReqToURL(req *pb.TapByResourceRequest) string {
res := req.GetTarget().GetResource()
// non-namespaced
if res.GetType() == k8s.Namespace {
return fmt.Sprintf(
"/apis/tap.linkerd.io/v1alpha1/watch/namespaces/%s/tap",
res.GetName(),
)
}
// namespaced
return fmt.Sprintf(
"/apis/tap.linkerd.io/v1alpha1/watch/namespaces/%s/%s/%s/tap",
res.GetNamespace(), res.GetType()+"s", res.GetName(),
)
}

View File

@ -15,7 +15,7 @@ import (
"github.com/golang/protobuf/proto"
publicPb "github.com/linkerd/linkerd2/controller/gen/public"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
kerrors "k8s.io/apimachinery/pkg/api/errors"
@ -61,10 +61,10 @@ func TestHttpRequestToProto(t *testing.T) {
someMethod := http.MethodPost
t.Run("Given a valid request, serializes its contents into protobuf object", func(t *testing.T) {
expectedProtoMessage := pb.Pod{
expectedProtoMessage := metricsPb.Pod{
Name: "some-name",
PodIP: "some-name",
Owner: &pb.Pod_Deployment{Deployment: "some-name"},
Owner: &metricsPb.Pod_Deployment{Deployment: "some-name"},
Status: "some-name",
Added: false,
ControllerNamespace: "some-name",
@ -80,7 +80,7 @@ func TestHttpRequestToProto(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
var actualProtoMessage pb.Pod
var actualProtoMessage metricsPb.Pod
err = HTTPRequestToProto(req, &actualProtoMessage)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -92,7 +92,7 @@ func TestHttpRequestToProto(t *testing.T) {
})
t.Run("Given a broken request, returns http error", func(t *testing.T) {
var actualProtoMessage pb.Pod
var actualProtoMessage metricsPb.Pod
req, err := http.NewRequest(someMethod, someURL, strings.NewReader("not really protobuf"))
if err != nil {
@ -136,8 +136,8 @@ func TestWriteErrorToHttpResponse(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
expectedErrorPayload := pb.ApiError{Error: genericError.Error()}
var actualErrorPayload pb.ApiError
expectedErrorPayload := metricsPb.ApiError{Error: genericError.Error()}
var actualErrorPayload metricsPb.ApiError
err = proto.Unmarshal(payloadRead, &actualErrorPayload)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -170,8 +170,8 @@ func TestWriteErrorToHttpResponse(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
expectedErrorPayload := pb.ApiError{Error: httpError.WrappedError.Error()}
var actualErrorPayload pb.ApiError
expectedErrorPayload := metricsPb.ApiError{Error: httpError.WrappedError.Error()}
var actualErrorPayload metricsPb.ApiError
err = proto.Unmarshal(payloadRead, &actualErrorPayload)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -203,8 +203,8 @@ func TestWriteErrorToHttpResponse(t *testing.T) {
t.Fatalf("Unexpected error: %v", err)
}
expectedErrorPayload := pb.ApiError{Error: expectedErrorMessage}
var actualErrorPayload pb.ApiError
expectedErrorPayload := metricsPb.ApiError{Error: expectedErrorMessage}
var actualErrorPayload metricsPb.ApiError
err = proto.Unmarshal(payloadRead, &actualErrorPayload)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -432,7 +432,7 @@ func TestCheckIfResponseHasError(t *testing.T) {
t.Run("returns error in body if response contains linkerd-error header", func(t *testing.T) {
expectedErrorMessage := "expected error message"
protoInBytes, err := proto.Marshal(&pb.ApiError{Error: expectedErrorMessage})
protoInBytes, err := proto.Marshal(&metricsPb.ApiError{Error: expectedErrorMessage})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -524,51 +524,6 @@ func TestCheckIfResponseHasError(t *testing.T) {
})
}
func TestTapReqToURL(t *testing.T) {
expectations := []struct {
req *pb.TapByResourceRequest
url string
}{
{
req: &pb.TapByResourceRequest{},
url: "/apis/tap.linkerd.io/v1alpha1/watch/namespaces//s//tap",
},
{
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
Type: "namespace",
Name: "test-name",
},
},
},
url: "/apis/tap.linkerd.io/v1alpha1/watch/namespaces/test-name/tap",
},
{
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
Namespace: "test-ns",
Type: "test-type",
Name: "test-name",
},
},
},
url: "/apis/tap.linkerd.io/v1alpha1/watch/namespaces/test-ns/test-types/test-name/tap",
},
}
for i, exp := range expectations {
exp := exp // pin
t.Run(fmt.Sprintf("%d constructs the expected URL from a TapRequest", i), func(t *testing.T) {
url := TapReqToURL(exp.req)
if url != exp.url {
t.Fatalf("Unexpected url: %s, Expected: %s", url, exp.url)
}
})
}
}
func assertResponseHasProtobufContentType(t *testing.T, responseWriter *stubResponseWriter) {
actualContentType := responseWriter.headers.Get(contentTypeHeader)
expectedContentType := protobufContentType

View File

@ -71,7 +71,7 @@ Kubernetes: `>=1.13.0-0`
| Key | Type | Default | Description |
|-----|------|---------|-------------|
| clusterDomain | string | `"cluster.local"` | Kubernetes DNS Domain name to use |
| clusterDomain | string | `"cluster.local"` | Kubernetes DNS Domain name to use |
| createdByAnnotation | string | `"linkerd.io/created-by"` | |
| dashboard.UID | int | `2103` | |
| dashboard.enforcedHostRegexp | string | `""` | Host header validation regex for the dashboard. See the [Linkerd documentation](https://linkerd.io/2/tasks/exposing-dashboard) for more information |
@ -100,7 +100,7 @@ Kubernetes: `>=1.13.0-0`
| grafana.resources.cpu.request | string | `nil` | Amount of CPU units that the grafana container requests |
| grafana.resources.memory.limit | string | `nil` | Maximum amount of memory that grafana container can use |
| grafana.resources.memory.request | string | `nil` | Amount of memory that the grafana container requests |
| identityTrustDomain | string | `"cluster.local"` | Trust domain used for identity |
| identityTrustDomain | string | `"cluster.local"` | Trust domain used for identity |
| imagePullSecrets | list | `[]` | For Private docker registries, authentication is needed. Registry secrets are applied to the respective service accounts |
| installNamespace | bool | `true` | Set to false when installing in a custom namespace. |
| jaegerUrl | string | `""` | url of external jaeger instance Set this to `jaeger.linkerd-jaeger.svc.<clusterDomain>` if you plan to use jaeger extension |
@ -144,7 +144,7 @@ Kubernetes: `>=1.13.0-0`
| tap.caBundle | string | `""` | Bundle of CA certificates for Tap component. If not provided then Helm will use the certificate generated for `tap.crtPEM`. If `tap.externalSecret` is set to true, this value must be set, as no certificate will be generated. |
| tap.crtPEM | string | `""` | Certificate for the Tap component. If not provided then Helm will generate one. |
| tap.externalSecret | bool | `false` | Do not create a secret resource for the Tap component. If this is set to `true`, the value `tap.caBundle` must be set (see below). |
| tap.image.name | string | `"controller"` | Docker image name for the tap instance |
| tap.image.name | string | `"tap"` | Docker image name for the tap instance |
| tap.image.registry | string | `"ghcr.io/linkerd"` | Docker registry for the tap instance |
| tap.image.tag | string | `"linkerdVersionValue"` | Docker image tag for the tap instance |
| tap.keyPEM | string | `""` | Certificate key for Tap component. If not provided then Helm will generate one. |
@ -160,7 +160,7 @@ Kubernetes: `>=1.13.0-0`
| tapInjector.crtPEM | string | `""` | Certificate for the tapInjector. If not provided then Helm will generate one. |
| tapInjector.externalSecret | bool | `false` | Do not create a secret resource for the tapInjector webhook. If this is set to `true`, the value `tapInjector.caBundle` must be set (see below) |
| tapInjector.failurePolicy | string | `"Ignore"` | |
| tapInjector.image.name | string | `"controller"` | Docker image name for the tapInjector instance |
| tapInjector.image.name | string | `"tap"` | Docker image name for the tapInjector instance |
| tapInjector.image.registry | string | `"ghcr.io/linkerd"` | Docker registry for the tapInjector instance |
| tapInjector.image.tag | string | `"linkerdVersionValue"` | Docker image tag for the tapInjector instance |
| tapInjector.keyPEM | string | `""` | Certificate key for the tapInjector. If not provided then Helm will generate one. |
@ -172,7 +172,7 @@ Kubernetes: `>=1.13.0-0`
| tapInjector.resources.cpu.request | string | `nil` | Amount of CPU units that the tapInjector container requests |
| tapInjector.resources.memory.limit | string | `nil` | Maximum amount of memory that tapInjector container can use |
| tapInjector.resources.memory.request | string | `nil` | Amount of memory that the tapInjector container requests |
| tolerations | string | `nil` | Tolerations section, See the [K8S documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/) for more information |
| tolerations | string | `nil` | Tolerations section, See the [K8S documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/) for more information |
----------------------------------------------
Autogenerated from chart metadata using [helm-docs v1.4.0](https://github.com/norwoodj/helm-docs/releases/v1.4.0)

View File

@ -69,7 +69,7 @@ spec:
{{- end }}
containers:
- args:
- tap-injector
- injector
- -tap-service-name=linkerd-tap.{{.Values.namespace}}.serviceaccount.identity.$(_l5d_ns).$(_l5d_trustdomain)
image: {{.Values.tapInjector.image.registry}}/{{.Values.tapInjector.image.name}}:{{.Values.tapInjector.image.tag}}
imagePullPolicy: {{.Values.tapInjector.image.pullPolicy}}

View File

@ -79,8 +79,8 @@ spec:
{{- end }}
containers:
- args:
- tap
- -controller-namespace={{.Values.linkerdNamespace}}
- api
- -api-namespace={{.Values.linkerdNamespace}}
- -log-level={{.Values.tap.logLevel}}
- -identity-trust-domain={{.Values.identityTrustDomain}}
image: {{.Values.tap.image.registry}}/{{.Values.tap.image.name}}:{{.Values.tap.image.tag}}

View File

@ -6,9 +6,9 @@
# -- control plane version. See Proxy section for proxy version
linkerdVersion: &linkerd_version linkerdVersionValue
# -- Kubernetes DNS Domain name to use
# -- Kubernetes DNS Domain name to use
clusterDomain: &cluster_domain cluster.local
# -- Trust domain used for identity
# -- Trust domain used for identity
identityTrustDomain: *cluster_domain
# Annotation labels. Do not edit.
createdByAnnotation: linkerd.io/created-by
@ -39,7 +39,7 @@ nodeSelector:
imagePullSecrets: []
# - name: my-private-docker-registry-login-secret
# -- Tolerations section, See the
# -- Tolerations section, See the
# [K8S documentation](https://kubernetes.io/docs/concepts/scheduling-eviction/taint-and-toleration/)
# for more information
tolerations:
@ -101,7 +101,7 @@ tap:
# -- Docker registry for the tap instance
registry: *registry
# -- Docker image name for the tap instance
name: controller
name: tap
# -- Docker image tag for the tap instance
tag: *linkerd_version
# -- Do not create a secret resource for the Tap component. If this is set to
@ -149,7 +149,7 @@ tapInjector:
# -- Docker registry for the tapInjector instance
registry: *registry
# -- Docker image name for the tapInjector instance
name: controller
name: tap
# -- Docker image tag for the tapInjector instance
tag: *linkerd_version
namespaceSelector:

View File

@ -16,6 +16,7 @@ import (
"github.com/linkerd/linkerd2/pkg/healthcheck"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/linkerd/linkerd2/viz/metrics-api/util"
"github.com/linkerd/linkerd2/viz/pkg"
"github.com/linkerd/linkerd2/viz/pkg/api"
"github.com/spf13/cobra"
)
@ -83,7 +84,7 @@ func NewCmdEdges() *cobra.Command {
# Get all edges between pods in all namespaces.
linkerd viz edges po --all-namespaces`,
Args: cobra.ExactArgs(1),
ValidArgs: coreUtil.ValidTargets,
ValidArgs: pkg.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
if options.namespace == "" {
options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)

View File

@ -13,16 +13,15 @@ import (
"time"
"github.com/ghodss/yaml"
"github.com/linkerd/linkerd2/controller/api/util"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/profiles"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/linkerd/linkerd2/viz/pkg/api"
pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/linkerd/linkerd2/viz/tap/pkg"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -118,12 +117,12 @@ func newCmdProfile() *cobra.Command {
// a service profile with routes pre-populated from the tap data
// Only inbound tap traffic is considered.
func renderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
requestParams := util.TapRequestParams{
requestParams := pkg.TapRequestParams{
Resource: tapResource,
Namespace: namespace,
}
log.Debugf("Running `linkerd tap %s --namespace %s`", tapResource, namespace)
req, err := util.BuildTapByResourceRequest(requestParams)
req, err := pkg.BuildTapByResourceRequest(requestParams)
if err != nil {
return err
}
@ -149,7 +148,7 @@ func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq
}
ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration)
defer cancel()
reader, body, err := tap.Reader(ctxWithTime, k8sAPI, tapReq)
reader, body, err := pkg.Reader(ctxWithTime, k8sAPI, tapReq)
if err != nil {
return profile, err
}
@ -172,7 +171,7 @@ func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSp
if err != io.EOF &&
!(errors.As(err, &e) && e.Timeout()) &&
!errors.Is(err, context.DeadlineExceeded) &&
!strings.HasSuffix(err.Error(), tap.ErrClosedResponseBody) {
!strings.HasSuffix(err.Error(), pkg.ErrClosedResponseBody) {
fmt.Fprintln(os.Stderr, err)
}
break

View File

@ -7,12 +7,13 @@ import (
"testing"
"time"
"github.com/linkerd/linkerd2/controller/api/util"
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/profiles"
"github.com/linkerd/linkerd2/pkg/protohttp"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/linkerd/linkerd2/viz/tap/pkg"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
@ -23,58 +24,58 @@ func TestTapToServiceProfile(t *testing.T) {
tapDuration := 5 * time.Second
routeLimit := 20
params := util.TapRequestParams{
params := pkg.TapRequestParams{
Resource: "deploy/" + name,
Namespace: namespace,
}
tapReq, err := util.BuildTapByResourceRequest(params)
tapReq, err := pkg.BuildTapByResourceRequest(params)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event1 := util.CreateTapEvent(
&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_RequestInit_{
event1 := pkg.CreateTapEvent(
&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_RequestInit_{
RequestInit: &pb.TapEvent_Http_RequestInit{
Id: &pb.TapEvent_Http_StreamId{
RequestInit: &tapPb.TapEvent_Http_RequestInit{
Id: &tapPb.TapEvent_Http_StreamId{
Base: 1,
},
Authority: "",
Path: "/emojivoto.v1.VotingService/VoteFire",
Method: &pb.HttpMethod{
Type: &pb.HttpMethod_Registered_{
Registered: pb.HttpMethod_POST,
Method: &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Registered_{
Registered: metricsPb.HttpMethod_POST,
},
},
},
},
},
map[string]string{},
pb.TapEvent_INBOUND,
tapPb.TapEvent_INBOUND,
)
event2 := util.CreateTapEvent(
&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_RequestInit_{
event2 := pkg.CreateTapEvent(
&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_RequestInit_{
RequestInit: &pb.TapEvent_Http_RequestInit{
Id: &pb.TapEvent_Http_StreamId{
RequestInit: &tapPb.TapEvent_Http_RequestInit{
Id: &tapPb.TapEvent_Http_StreamId{
Base: 2,
},
Authority: "",
Path: "/my/path/hi",
Method: &pb.HttpMethod{
Type: &pb.HttpMethod_Registered_{
Registered: pb.HttpMethod_GET,
Method: &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Registered_{
Registered: metricsPb.HttpMethod_GET,
},
},
},
},
},
map[string]string{},
pb.TapEvent_INBOUND,
tapPb.TapEvent_INBOUND,
)
kubeAPI, err := k8s.NewFakeAPI()
@ -83,7 +84,7 @@ func TestTapToServiceProfile(t *testing.T) {
}
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
for _, event := range []*pb.TapEvent{event1, event2} {
for _, event := range []*tapPb.TapEvent{event1, event2} {
event := event // pin
err = protohttp.WriteProtoToHTTPResponse(w, event)
if err != nil {

View File

@ -17,6 +17,7 @@ import (
"github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/linkerd/linkerd2/viz/metrics-api/util"
"github.com/linkerd/linkerd2/viz/pkg"
"github.com/linkerd/linkerd2/viz/pkg/api"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -63,7 +64,7 @@ This command will only display traffic which is sent to a service that has a Ser
# Routes for calls from the traffic deployment to the webapp service in the test namespace.
linkerd viz routes deploy/traffic -n test --to svc/webapp`,
Args: cobra.ExactArgs(1),
ValidArgs: coreUtil.ValidTargets,
ValidArgs: pkg.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
if options.namespace == "" {
options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)

View File

@ -18,6 +18,7 @@ import (
"github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/linkerd/linkerd2/viz/metrics-api/util"
"github.com/linkerd/linkerd2/viz/pkg"
"github.com/linkerd/linkerd2/viz/pkg/api"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
@ -167,7 +168,7 @@ If no resource name is specified, displays stats about all resources of the spec
# Get all inbound stats to the test namespace.
linkerd viz stat ns/test`,
Args: cobra.MinimumNArgs(1),
ValidArgs: coreUtil.ValidTargets,
ValidArgs: pkg.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
if options.namespace == "" {
options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)

View File

@ -10,22 +10,23 @@ import (
"strings"
"github.com/golang/protobuf/ptypes/duration"
"github.com/linkerd/linkerd2/controller/api/util"
netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
"github.com/linkerd/linkerd2/pkg/addr"
pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizpkg "github.com/linkerd/linkerd2/viz/pkg"
"github.com/linkerd/linkerd2/viz/pkg/api"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/linkerd/linkerd2/viz/tap/pkg"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"google.golang.org/grpc/codes"
)
type renderTapEventFunc func(*pb.TapEvent, string) string
type renderTapEventFunc func(*tapPb.TapEvent, string) string
type tapOptions struct {
namespace string
@ -173,7 +174,7 @@ func NewCmdTap() *cobra.Command {
# tap the test namespace, filter by request to prod namespace
linkerd viz tap ns/test --to ns/prod`,
Args: cobra.RangeArgs(1, 2),
ValidArgs: util.ValidTargets,
ValidArgs: vizpkg.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
if options.namespace == "" {
options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
@ -188,7 +189,7 @@ func NewCmdTap() *cobra.Command {
APIAddr: apiAddr,
})
requestParams := util.TapRequestParams{
requestParams := pkg.TapRequestParams{
Resource: strings.Join(args, "/"),
Namespace: options.namespace,
ToResource: options.toResource,
@ -207,7 +208,7 @@ func NewCmdTap() *cobra.Command {
return fmt.Errorf("validation error when executing tap command: %v", err)
}
req, err := util.BuildTapByResourceRequest(requestParams)
req, err := pkg.BuildTapByResourceRequest(requestParams)
if err != nil {
return err
}
@ -245,8 +246,8 @@ func NewCmdTap() *cobra.Command {
return cmd
}
func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, options *tapOptions) error {
reader, body, err := tap.Reader(ctx, k8sAPI, req)
func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.KubernetesAPI, req *tapPb.TapByResourceRequest, options *tapOptions) error {
reader, body, err := pkg.Reader(ctx, k8sAPI, req)
if err != nil {
return err
}
@ -255,7 +256,7 @@ func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.K
return writeTapEventsToBuffer(w, reader, req, options)
}
func writeTapEventsToBuffer(w io.Writer, tapByteStream *bufio.Reader, req *pb.TapByResourceRequest, options *tapOptions) error {
func writeTapEventsToBuffer(w io.Writer, tapByteStream *bufio.Reader, req *tapPb.TapByResourceRequest, options *tapOptions) error {
var err error
switch options.output {
case "":
@ -276,7 +277,7 @@ func writeTapEventsToBuffer(w io.Writer, tapByteStream *bufio.Reader, req *pb.Ta
func renderTapEvents(tapByteStream *bufio.Reader, w io.Writer, render renderTapEventFunc, resource string) error {
for {
log.Debug("Waiting for data...")
event := pb.TapEvent{}
event := tapPb.TapEvent{}
err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
if err == io.EOF {
break
@ -295,17 +296,17 @@ func renderTapEvents(tapByteStream *bufio.Reader, w io.Writer, render renderTapE
}
// renderTapEvent renders a Public API TapEvent to a string.
func renderTapEvent(event *pb.TapEvent, resource string) string {
func renderTapEvent(event *tapPb.TapEvent, resource string) string {
dst := dst(event)
src := src(event)
proxy := "???"
tls := ""
switch event.GetProxyDirection() {
case pb.TapEvent_INBOUND:
case tapPb.TapEvent_INBOUND:
proxy = "in " // A space is added so it aligns with `out`.
tls = src.tlsStatus()
case pb.TapEvent_OUTBOUND:
case tapPb.TapEvent_OUTBOUND:
proxy = "out"
tls = dst.tlsStatus()
default:
@ -331,7 +332,7 @@ func renderTapEvent(event *pb.TapEvent, resource string) string {
}
switch ev := event.GetHttp().GetEvent().(type) {
case *pb.TapEvent_Http_RequestInit_:
case *tapPb.TapEvent_Http_RequestInit_:
return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s%s",
ev.RequestInit.GetId().GetBase(),
ev.RequestInit.GetId().GetStream(),
@ -342,7 +343,7 @@ func renderTapEvent(event *pb.TapEvent, resource string) string {
resources,
)
case *pb.TapEvent_Http_ResponseInit_:
case *tapPb.TapEvent_Http_ResponseInit_:
return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs%s",
ev.ResponseInit.GetId().GetBase(),
ev.ResponseInit.GetId().GetStream(),
@ -352,9 +353,9 @@ func renderTapEvent(event *pb.TapEvent, resource string) string {
resources,
)
case *pb.TapEvent_Http_ResponseEnd_:
case *tapPb.TapEvent_Http_ResponseEnd_:
switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) {
case *pb.Eos_GrpcStatusCode:
case *metricsPb.Eos_GrpcStatusCode:
return fmt.Sprintf(
"end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB%s",
ev.ResponseEnd.GetId().GetBase(),
@ -366,7 +367,7 @@ func renderTapEvent(event *pb.TapEvent, resource string) string {
resources,
)
case *pb.Eos_ResetErrorCode:
case *metricsPb.Eos_ResetErrorCode:
return fmt.Sprintf(
"end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB%s",
ev.ResponseEnd.GetId().GetBase(),
@ -395,7 +396,7 @@ func renderTapEvent(event *pb.TapEvent, resource string) string {
}
// renderTapEventJSON renders a Public API TapEvent to a string in JSON format.
func renderTapEventJSON(event *pb.TapEvent, _ string) string {
func renderTapEventJSON(event *tapPb.TapEvent, _ string) string {
m := mapPublicToDisplayTapEvent(event)
e, err := json.MarshalIndent(m, "", " ")
if err != nil {
@ -405,7 +406,7 @@ func renderTapEventJSON(event *pb.TapEvent, _ string) string {
}
// Map public API `TapEvent`s to `displayTapEvent`s
func mapPublicToDisplayTapEvent(event *pb.TapEvent) *tapEvent {
func mapPublicToDisplayTapEvent(event *tapPb.TapEvent) *tapEvent {
// Map source endpoint
sip := addr.PublicIPToString(event.GetSource().GetIp())
src := &endpoint{
@ -434,7 +435,7 @@ func mapPublicToDisplayTapEvent(event *pb.TapEvent) *tapEvent {
}
// Attempt to map a `TapEvent_Http_RequestInit event to a `requestInitEvent`
func getRequestInitEvent(pubEv *pb.TapEvent_Http) *requestInitEvent {
func getRequestInitEvent(pubEv *tapPb.TapEvent_Http) *requestInitEvent {
reqI := pubEv.GetRequestInit()
if reqI == nil {
return nil
@ -453,28 +454,28 @@ func getRequestInitEvent(pubEv *pb.TapEvent_Http) *requestInitEvent {
}
}
func formatMethod(m *pb.HttpMethod) string {
if x, ok := m.GetType().(*pb.HttpMethod_Registered_); ok {
func formatMethod(m *metricsPb.HttpMethod) string {
if x, ok := m.GetType().(*metricsPb.HttpMethod_Registered_); ok {
return x.Registered.String()
}
if s, ok := m.GetType().(*pb.HttpMethod_Unregistered); ok {
if s, ok := m.GetType().(*metricsPb.HttpMethod_Unregistered); ok {
return s.Unregistered
}
return ""
}
func formatScheme(s *pb.Scheme) string {
if x, ok := s.GetType().(*pb.Scheme_Registered_); ok {
func formatScheme(s *metricsPb.Scheme) string {
if x, ok := s.GetType().(*metricsPb.Scheme_Registered_); ok {
return x.Registered.String()
}
if str, ok := s.GetType().(*pb.Scheme_Unregistered); ok {
if str, ok := s.GetType().(*metricsPb.Scheme_Unregistered); ok {
return str.Unregistered
}
return ""
}
// Attempt to map a `TapEvent_Http_ResponseInit` event to a `responseInitEvent`
func getResponseInitEvent(pubEv *pb.TapEvent_Http) *responseInitEvent {
func getResponseInitEvent(pubEv *tapPb.TapEvent_Http) *responseInitEvent {
resI := pubEv.GetResponseInit()
if resI == nil {
return nil
@ -492,7 +493,7 @@ func getResponseInitEvent(pubEv *pb.TapEvent_Http) *responseInitEvent {
}
// Attempt to map a `TapEvent_Http_ResponseEnd` event to a `responseEndEvent`
func getResponseEndEvent(pubEv *pb.TapEvent_Http) *responseEndEvent {
func getResponseEndEvent(pubEv *tapPb.TapEvent_Http) *responseEndEvent {
resE := pubEv.GetResponseEnd()
if resE == nil {
return nil
@ -512,15 +513,15 @@ func getResponseEndEvent(pubEv *pb.TapEvent_Http) *responseEndEvent {
}
}
func formatHeadersTrailers(hs *pb.Headers) []metadata {
func formatHeadersTrailers(hs *metricsPb.Headers) []metadata {
var fm []metadata
for _, h := range hs.GetHeaders() {
switch h.GetValue().(type) {
case *pb.Headers_Header_ValueStr:
case *metricsPb.Headers_Header_ValueStr:
fht := &metadataStr{Name: h.GetName(), ValueStr: h.GetValueStr()}
fm = append(fm, fht)
continue
case *pb.Headers_Header_ValueBin:
case *metricsPb.Headers_Header_ValueBin:
fht := &metadataBin{Name: h.GetName(), ValueBin: h.GetValueBin()}
fm = append(fm, fht)
continue
@ -530,7 +531,7 @@ func formatHeadersTrailers(hs *pb.Headers) []metadata {
}
// src returns the source peer of a `TapEvent`.
func src(event *pb.TapEvent) peer {
func src(event *tapPb.TapEvent) peer {
return peer{
address: event.GetSource(),
labels: event.GetSourceMeta().GetLabels(),
@ -539,7 +540,7 @@ func src(event *pb.TapEvent) peer {
}
// dst returns the destination peer of a `TapEvent`.
func dst(event *pb.TapEvent) peer {
func dst(event *tapPb.TapEvent) peer {
return peer{
address: event.GetDestination(),
labels: event.GetDestinationMeta().GetLabels(),
@ -596,7 +597,7 @@ func (p *peer) tlsStatus() string {
return p.labels["tls"]
}
func routeLabels(event *pb.TapEvent) string {
func routeLabels(event *tapPb.TapEvent) string {
out := ""
for key, val := range event.GetRouteMeta().GetLabels() {
out = fmt.Sprintf("%s rt_%s=%s", out, key, val)

View File

@ -9,12 +9,13 @@ import (
"testing"
"github.com/golang/protobuf/ptypes/duration"
"github.com/linkerd/linkerd2/controller/api/util"
netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
"github.com/linkerd/linkerd2/pkg/addr"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/linkerd/linkerd2/viz/tap/pkg"
"google.golang.org/grpc/codes"
)
@ -22,7 +23,7 @@ const targetName = "pod-666"
func busyTest(t *testing.T, output string) {
resourceType := k8s.Pod
params := util.TapRequestParams{
params := pkg.TapRequestParams{
Resource: resourceType + "/" + targetName,
Scheme: "https",
Method: "GET",
@ -30,41 +31,41 @@ func busyTest(t *testing.T, output string) {
Path: "/some/path",
}
req, err := util.BuildTapByResourceRequest(params)
req, err := pkg.BuildTapByResourceRequest(params)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
event1 := util.CreateTapEvent(
&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_RequestInit_{
RequestInit: &pb.TapEvent_Http_RequestInit{
Id: &pb.TapEvent_Http_StreamId{
event1 := pkg.CreateTapEvent(
&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_RequestInit_{
RequestInit: &tapPb.TapEvent_Http_RequestInit{
Id: &tapPb.TapEvent_Http_StreamId{
Base: 1,
},
Method: &pb.HttpMethod{
Type: &pb.HttpMethod_Registered_{
Registered: pb.HttpMethod_GET,
Method: &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Registered_{
Registered: metricsPb.HttpMethod_GET,
},
},
Scheme: &pb.Scheme{
Type: &pb.Scheme_Registered_{
Registered: pb.Scheme_HTTPS,
Scheme: &metricsPb.Scheme{
Type: &metricsPb.Scheme_Registered_{
Registered: metricsPb.Scheme_HTTPS,
},
},
Authority: params.Authority,
Path: params.Path,
Headers: &pb.Headers{
Headers: []*pb.Headers_Header{
Headers: &metricsPb.Headers{
Headers: []*metricsPb.Headers_Header{
{
Name: "header-name-1",
Value: &pb.Headers_Header_ValueStr{
Value: &metricsPb.Headers_Header_ValueStr{
ValueStr: "header-value-str-1",
},
},
{
Name: "header-name-2",
Value: &pb.Headers_Header_ValueBin{
Value: &metricsPb.Headers_Header_ValueBin{
ValueBin: []byte("header-value-bin-2"),
},
},
@ -77,17 +78,17 @@ func busyTest(t *testing.T, output string) {
"pod": "my-pod",
"tls": "true",
},
pb.TapEvent_OUTBOUND,
tapPb.TapEvent_OUTBOUND,
)
event2 := util.CreateTapEvent(
&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &pb.TapEvent_Http_ResponseEnd{
Id: &pb.TapEvent_Http_StreamId{
event2 := pkg.CreateTapEvent(
&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
Id: &tapPb.TapEvent_Http_StreamId{
Base: 1,
},
Eos: &pb.Eos{
End: &pb.Eos_GrpcStatusCode{GrpcStatusCode: 666},
Eos: &metricsPb.Eos{
End: &metricsPb.Eos_GrpcStatusCode{GrpcStatusCode: 666},
},
SinceRequestInit: &duration.Duration{
Seconds: 10,
@ -96,11 +97,11 @@ func busyTest(t *testing.T, output string) {
Seconds: 100,
},
ResponseBytes: 1337,
Trailers: &pb.Headers{
Headers: []*pb.Headers_Header{
Trailers: &metricsPb.Headers{
Headers: []*metricsPb.Headers_Header{
{
Name: "trailer-name",
Value: &pb.Headers_Header_ValueBin{
Value: &metricsPb.Headers_Header_ValueBin{
ValueBin: []byte("header-value-bin"),
},
},
@ -110,7 +111,7 @@ func busyTest(t *testing.T, output string) {
},
},
map[string]string{},
pb.TapEvent_OUTBOUND,
tapPb.TapEvent_OUTBOUND,
)
kubeAPI, err := k8s.NewFakeAPI()
if err != nil {
@ -118,7 +119,7 @@ func busyTest(t *testing.T, output string) {
}
ts := httptest.NewServer(http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
for _, event := range []*pb.TapEvent{event1, event2} {
for _, event := range []*tapPb.TapEvent{event1, event2} {
err = protohttp.WriteProtoToHTTPResponse(w, event)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -176,7 +177,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
t.Run("Should render empty response if no events returned", func(t *testing.T) {
resourceType := k8s.Pod
params := util.TapRequestParams{
params := pkg.TapRequestParams{
Resource: resourceType + "/" + targetName,
Scheme: "https",
Method: "GET",
@ -184,7 +185,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
Path: "/some/path",
}
req, err := util.BuildTapByResourceRequest(params)
req, err := pkg.BuildTapByResourceRequest(params)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -220,7 +221,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
t.Run("Should return error if stream returned error", func(t *testing.T) {
t.SkipNow()
resourceType := k8s.Pod
params := util.TapRequestParams{
params := pkg.TapRequestParams{
Resource: resourceType + "/" + targetName,
Scheme: "https",
Method: "GET",
@ -228,7 +229,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
Path: "/some/path",
}
req, err := util.BuildTapByResourceRequest(params)
req, err := pkg.BuildTapByResourceRequest(params)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -248,23 +249,23 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
}
func TestEventToString(t *testing.T) {
toTapEvent := func(httpEvent *pb.TapEvent_Http) *pb.TapEvent {
streamID := &pb.TapEvent_Http_StreamId{
toTapEvent := func(httpEvent *tapPb.TapEvent_Http) *tapPb.TapEvent {
streamID := &tapPb.TapEvent_Http_StreamId{
Base: 7,
Stream: 8,
}
switch httpEvent.Event.(type) {
case *pb.TapEvent_Http_RequestInit_:
case *tapPb.TapEvent_Http_RequestInit_:
httpEvent.GetRequestInit().Id = streamID
case *pb.TapEvent_Http_ResponseInit_:
case *tapPb.TapEvent_Http_ResponseInit_:
httpEvent.GetResponseInit().Id = streamID
case *pb.TapEvent_Http_ResponseEnd_:
case *tapPb.TapEvent_Http_ResponseEnd_:
httpEvent.GetResponseEnd().Id = streamID
}
return &pb.TapEvent{
ProxyDirection: pb.TapEvent_OUTBOUND,
return &tapPb.TapEvent{
ProxyDirection: tapPb.TapEvent_OUTBOUND,
Source: &netPb.TcpAddress{
Ip: addr.PublicIPV4(1, 2, 3, 4),
Port: 5555,
@ -273,22 +274,22 @@ func TestEventToString(t *testing.T) {
Ip: addr.PublicIPV4(2, 3, 4, 5),
Port: 6666,
},
Event: &pb.TapEvent_Http_{Http: httpEvent},
Event: &tapPb.TapEvent_Http_{Http: httpEvent},
}
}
t.Run("Converts HTTP request init event to string", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_RequestInit_{
RequestInit: &pb.TapEvent_Http_RequestInit{
Method: &pb.HttpMethod{
Type: &pb.HttpMethod_Registered_{
Registered: pb.HttpMethod_POST,
event := toTapEvent(&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_RequestInit_{
RequestInit: &tapPb.TapEvent_Http_RequestInit{
Method: &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Registered_{
Registered: metricsPb.HttpMethod_POST,
},
},
Scheme: &pb.Scheme{
Type: &pb.Scheme_Registered_{
Registered: pb.Scheme_HTTPS,
Scheme: &metricsPb.Scheme{
Type: &metricsPb.Scheme_Registered_{
Registered: metricsPb.Scheme_HTTPS,
},
},
Authority: "hello.default:7777",
@ -305,9 +306,9 @@ func TestEventToString(t *testing.T) {
})
t.Run("Converts HTTP response init event to string", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseInit_{
ResponseInit: &pb.TapEvent_Http_ResponseInit{
event := toTapEvent(&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseInit_{
ResponseInit: &tapPb.TapEvent_Http_ResponseInit{
SinceRequestInit: &duration.Duration{Nanos: 999000},
HttpStatus: http.StatusOK,
},
@ -322,14 +323,14 @@ func TestEventToString(t *testing.T) {
})
t.Run("Converts gRPC response end event to string", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &pb.TapEvent_Http_ResponseEnd{
event := toTapEvent(&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
SinceRequestInit: &duration.Duration{Nanos: 999000},
SinceResponseInit: &duration.Duration{Nanos: 888000},
ResponseBytes: 111,
Eos: &pb.Eos{
End: &pb.Eos_GrpcStatusCode{GrpcStatusCode: uint32(codes.OK)},
Eos: &metricsPb.Eos{
End: &metricsPb.Eos_GrpcStatusCode{GrpcStatusCode: uint32(codes.OK)},
},
},
},
@ -343,14 +344,14 @@ func TestEventToString(t *testing.T) {
})
t.Run("Converts HTTP response end event with reset error code to string", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &pb.TapEvent_Http_ResponseEnd{
event := toTapEvent(&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
SinceRequestInit: &duration.Duration{Nanos: 999000},
SinceResponseInit: &duration.Duration{Nanos: 888000},
ResponseBytes: 111,
Eos: &pb.Eos{
End: &pb.Eos_ResetErrorCode{ResetErrorCode: 123},
Eos: &metricsPb.Eos{
End: &metricsPb.Eos_ResetErrorCode{ResetErrorCode: 123},
},
},
},
@ -364,13 +365,13 @@ func TestEventToString(t *testing.T) {
})
t.Run("Converts HTTP response end event with empty EOS context string", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &pb.TapEvent_Http_ResponseEnd{
event := toTapEvent(&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
SinceRequestInit: &duration.Duration{Nanos: 999000},
SinceResponseInit: &duration.Duration{Nanos: 888000},
ResponseBytes: 111,
Eos: &pb.Eos{},
Eos: &metricsPb.Eos{},
},
},
})
@ -383,9 +384,9 @@ func TestEventToString(t *testing.T) {
})
t.Run("Converts HTTP response end event without EOS context string", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &pb.TapEvent_Http_ResponseEnd{
event := toTapEvent(&tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
SinceRequestInit: &duration.Duration{Nanos: 999000},
SinceResponseInit: &duration.Duration{Nanos: 888000},
ResponseBytes: 111,
@ -401,7 +402,7 @@ func TestEventToString(t *testing.T) {
})
t.Run("Handles unknown event types", func(t *testing.T) {
event := toTapEvent(&pb.TapEvent_Http{})
event := toTapEvent(&tapPb.TapEvent_Http{})
expectedOutput := "unknown proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls="
output := renderTapEvent(event, "")

View File

@ -931,11 +931,11 @@ spec:
beta.kubernetes.io/os: linux
containers:
- args:
- tap
- -controller-namespace=linkerd
- api
- -api-namespace=linkerd
- -log-level=info
- -identity-trust-domain=cluster.local
image: ghcr.io/linkerd/controller:dev-undefined
image: ghcr.io/linkerd/tap:dev-undefined
imagePullPolicy:
livenessProbe:
httpGet:
@ -1092,9 +1092,9 @@ spec:
beta.kubernetes.io/os: linux
containers:
- args:
- tap-injector
- injector
- -tap-service-name=linkerd-tap.linkerd-viz.serviceaccount.identity.$(_l5d_ns).$(_l5d_trustdomain)
image: ghcr.io/linkerd/controller:dev-undefined
image: ghcr.io/linkerd/tap:dev-undefined
imagePullPolicy:
livenessProbe:
httpGet:

View File

@ -640,11 +640,11 @@ spec:
beta.kubernetes.io/os: linux
containers:
- args:
- tap
- -controller-namespace=linkerd
- api
- -api-namespace=linkerd
- -log-level=info
- -identity-trust-domain=cluster.local
image: ghcr.io/linkerd/controller:dev-undefined
image: ghcr.io/linkerd/tap:dev-undefined
imagePullPolicy:
livenessProbe:
httpGet:
@ -801,9 +801,9 @@ spec:
beta.kubernetes.io/os: linux
containers:
- args:
- tap-injector
- injector
- -tap-service-name=linkerd-tap.linkerd-viz.serviceaccount.identity.$(_l5d_ns).$(_l5d_trustdomain)
image: ghcr.io/linkerd/controller:dev-undefined
image: ghcr.io/linkerd/tap:dev-undefined
imagePullPolicy:
livenessProbe:
httpGet:

View File

@ -943,11 +943,11 @@ spec:
beta.kubernetes.io/os: linux
containers:
- args:
- tap
- -controller-namespace=linkerd
- api
- -api-namespace=linkerd
- -log-level=info
- -identity-trust-domain=cluster.local
image: ghcr.io/linkerd/controller:dev-undefined
image: ghcr.io/linkerd/tap:dev-undefined
imagePullPolicy:
livenessProbe:
httpGet:
@ -1104,9 +1104,9 @@ spec:
beta.kubernetes.io/os: linux
containers:
- args:
- tap-injector
- injector
- -tap-service-name=linkerd-tap.linkerd-viz.serviceaccount.identity.$(_l5d_ns).$(_l5d_trustdomain)
image: ghcr.io/linkerd/controller:dev-undefined
image: ghcr.io/linkerd/tap:dev-undefined
imagePullPolicy:
livenessProbe:
httpGet:

View File

@ -11,16 +11,17 @@ import (
"time"
"github.com/golang/protobuf/ptypes"
"github.com/linkerd/linkerd2/controller/api/util"
"github.com/linkerd/linkerd2/pkg/addr"
pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
api "github.com/linkerd/linkerd2/viz/metrics-api"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizAPI "github.com/linkerd/linkerd2/viz/pkg/api"
metricsAPI "github.com/linkerd/linkerd2/viz/metrics-api"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizpkg "github.com/linkerd/linkerd2/viz/pkg"
"github.com/linkerd/linkerd2/viz/pkg/api"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/linkerd/linkerd2/viz/tap/pkg"
runewidth "github.com/mattn/go-runewidth"
termbox "github.com/nsf/termbox-go"
log "github.com/sirupsen/logrus"
@ -43,10 +44,10 @@ type topOptions struct {
}
type topRequest struct {
event *pb.TapEvent
reqInit *pb.TapEvent_Http_RequestInit
rspInit *pb.TapEvent_Http_ResponseInit
rspEnd *pb.TapEvent_Http_ResponseEnd
event *tapPb.TapEvent
reqInit *tapPb.TapEvent_Http_RequestInit
rspInit *tapPb.TapEvent_Http_ResponseInit
rspEnd *tapPb.TapEvent_Http_ResponseEnd
}
type topRequestID struct {
@ -320,13 +321,13 @@ func NewCmdTop() *cobra.Command {
# display traffic for the web-dlbvj pod in the default namespace
linkerd viz top pod/web-dlbvj`,
Args: cobra.RangeArgs(1, 2),
ValidArgs: util.ValidTargets,
ValidArgs: vizpkg.ValidTargets,
RunE: func(cmd *cobra.Command, args []string) error {
if options.namespace == "" {
options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
}
vizAPI.CheckClientOrExit(healthcheck.Options{
api.CheckClientOrExit(healthcheck.Options{
ControlPlaneNamespace: controlPlaneNamespace,
KubeConfig: kubeconfigPath,
Impersonate: impersonate,
@ -335,7 +336,7 @@ func NewCmdTop() *cobra.Command {
APIAddr: apiAddr,
})
requestParams := util.TapRequestParams{
requestParams := pkg.TapRequestParams{
Resource: strings.Join(args, "/"),
Namespace: options.namespace,
ToResource: options.toResource,
@ -362,7 +363,7 @@ func NewCmdTop() *cobra.Command {
table.columns[routeColumn].display = true
}
req, err := util.BuildTapByResourceRequest(requestParams)
req, err := pkg.BuildTapByResourceRequest(requestParams)
if err != nil {
return err
}
@ -399,8 +400,8 @@ func NewCmdTop() *cobra.Command {
return cmd
}
func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourceRequest, table *topTable) error {
reader, body, err := tap.Reader(ctx, k8sAPI, req)
func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *tapPb.TapByResourceRequest, table *topTable) error {
reader, body, err := pkg.Reader(ctx, k8sAPI, req)
if err != nil {
return err
}
@ -419,7 +420,7 @@ func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI,
// processEvents() ->
// requestCh ->
// renderTable()
eventCh := make(chan *pb.TapEvent)
eventCh := make(chan *tapPb.TapEvent)
requestCh := make(chan topRequest, 100)
// for closing:
@ -444,14 +445,14 @@ func getTrafficByResourceFromAPI(ctx context.Context, k8sAPI *k8s.KubernetesAPI,
return nil
}
func recvEvents(tapByteStream *bufio.Reader, eventCh chan<- *pb.TapEvent, closing chan<- struct{}) {
func recvEvents(tapByteStream *bufio.Reader, eventCh chan<- *tapPb.TapEvent, closing chan<- struct{}) {
for {
event := &pb.TapEvent{}
event := &tapPb.TapEvent{}
err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, event)
if err != nil {
if err == io.EOF {
fmt.Println("Tap stream terminated")
} else if !strings.HasSuffix(err.Error(), tap.ErrClosedResponseBody) {
} else if !strings.HasSuffix(err.Error(), pkg.ErrClosedResponseBody) {
fmt.Println(err.Error())
}
@ -463,7 +464,7 @@ func recvEvents(tapByteStream *bufio.Reader, eventCh chan<- *pb.TapEvent, closin
}
}
func processEvents(eventCh <-chan *pb.TapEvent, requestCh chan<- topRequest, done <-chan struct{}) {
func processEvents(eventCh <-chan *tapPb.TapEvent, requestCh chan<- topRequest, done <-chan struct{}) {
outstandingRequests := make(map[topRequestID]topRequest)
for {
@ -476,14 +477,14 @@ func processEvents(eventCh <-chan *pb.TapEvent, requestCh chan<- topRequest, don
dst: addr.PublicAddressToString(event.GetDestination()),
}
switch ev := event.GetHttp().GetEvent().(type) {
case *pb.TapEvent_Http_RequestInit_:
case *tapPb.TapEvent_Http_RequestInit_:
id.stream = ev.RequestInit.GetId().Stream
outstandingRequests[id] = topRequest{
event: event,
reqInit: ev.RequestInit,
}
case *pb.TapEvent_Http_ResponseInit_:
case *tapPb.TapEvent_Http_ResponseInit_:
id.stream = ev.ResponseInit.GetId().Stream
if req, ok := outstandingRequests[id]; ok {
req.rspInit = ev.ResponseInit
@ -492,7 +493,7 @@ func processEvents(eventCh <-chan *pb.TapEvent, requestCh chan<- topRequest, don
log.Warnf("Got ResponseInit for unknown stream: %s", id)
}
case *pb.TapEvent_Http_ResponseEnd_:
case *tapPb.TapEvent_Http_ResponseEnd_:
id.stream = ev.ResponseEnd.GetId().Stream
if req, ok := outstandingRequests[id]; ok {
req.rspEnd = ev.ResponseEnd
@ -555,7 +556,7 @@ func newRow(req topRequest) (tableRow, error) {
path := req.reqInit.GetPath()
route := req.event.GetRouteMeta().GetLabels()["route"]
if route == "" {
route = api.DefaultRouteName
route = metricsAPI.DefaultRouteName
}
method := req.reqInit.GetMethod().GetRegistered().String()
source := stripPort(addr.PublicAddressToString(req.event.GetSource()))
@ -576,7 +577,7 @@ func newRow(req topRequest) (tableRow, error) {
success := req.rspInit.GetHttpStatus() < 500
if success {
switch eos := req.rspEnd.GetEos().GetEnd().(type) {
case *pb.Eos_GrpcStatusCode:
case *metricsPb.Eos_GrpcStatusCode:
switch codes.Code(eos.GrpcStatusCode) {
case codes.Unknown,
codes.DeadlineExceeded,
@ -588,7 +589,7 @@ func newRow(req topRequest) (tableRow, error) {
success = true
}
case *pb.Eos_ResetErrorCode:
case *metricsPb.Eos_ResetErrorCode:
success = false
}
}

File diff suppressed because it is too large Load Diff

View File

@ -69,89 +69,6 @@ message Pod {
string resourceVersion = 17; // resource version in the Kubernetes API
}
message TapRequest {
option deprecated = true;
oneof target {
string pod = 1;
string deployment = 2;
}
// validation of these fields happens on the server
float maxRps = 3;
uint32 toPort = 4;
string toIP = 5;
uint32 fromPort = 6;
string fromIP = 7;
string scheme = 8;
string method = 9;
string authority = 10;
string path = 11;
}
// A tap request over kubernetes resources.
//
// This is used only by the tap APIServer.
message TapByResourceRequest {
// Describes the kubernetes pods that should be tapped.
ResourceSelection target = 1;
// Selects over events to be reported.
Match match = 2;
// Limits the number of events to be inspected.
float maxRps = 3;
message Match {
oneof match {
// If empty, matches all messages.
Seq all = 1;
// If empty, matches no messages.
Seq any = 2;
// Inverts the inner match.
Match not = 3;
// Matches events being sent to any of the selected destinations.
ResourceSelection destinations = 4;
// Matches HTTP requests by their metadata.
Http http = 5;
}
message Seq {
repeated Match matches = 1;
}
message Http {
oneof match {
string scheme = 1;
string method = 2;
string authority = 3;
string path = 4;
}
}
}
// Conditionally extracts components from requests and responses to include
// in tap events
Extract extract = 4;
message Extract {
oneof extract {
Http http = 1;
}
message Http {
oneof extract {
Headers headers = 1;
}
message Headers {}
}
}
}
message HttpMethod {
enum Registered {
GET = 0;
@ -206,81 +123,6 @@ message Eos {
}
}
// This is used only by the tap APIServer.
message TapEvent {
linkerd2.common.net.TcpAddress source = 1;
EndpointMeta source_meta = 5;
linkerd2.common.net.TcpAddress destination = 2;
EndpointMeta destination_meta = 4;
RouteMeta route_meta = 7;
ProxyDirection proxy_direction = 6;
enum ProxyDirection {
UNKNOWN = 0;
INBOUND = 1;
OUTBOUND = 2;
}
oneof event {
Http http = 3;
}
message EndpointMeta {
map<string, string> labels = 1;
}
message RouteMeta {
map<string, string> labels = 1;
}
message Http {
oneof event {
RequestInit request_init = 1;
ResponseInit response_init = 2;
ResponseEnd response_end = 3;
}
message StreamId {
// A randomized base (stable across a process's runtime)
uint32 base = 1;
// A stream id unique within the lifetime of `base`.
uint64 stream = 2;
}
message RequestInit {
StreamId id = 1;
HttpMethod method = 2;
Scheme scheme = 3;
string authority = 4;
string path = 5;
Headers headers = 6;
}
message ResponseInit {
StreamId id = 1;
google.protobuf.Duration since_request_init = 2;
uint32 http_status = 3;
Headers headers = 4;
}
message ResponseEnd {
StreamId id = 1;
google.protobuf.Duration since_request_init = 2;
google.protobuf.Duration since_response_init = 3;
uint64 response_bytes = 4;
Eos eos = 5;
Headers trailers = 6;
}
}
}
message ApiError {
string error = 1;
}
@ -525,8 +367,3 @@ service Api {
rpc SelfCheck(SelfCheckRequest) returns (SelfCheckResponse) {}
}
service Tap {
rpc Tap(TapRequest) returns (stream TapEvent) { option deprecated = true; }
rpc TapByResource(TapByResourceRequest) returns (stream TapEvent) { option deprecated = true; }
}

20
viz/pkg/util.go Normal file
View File

@ -0,0 +1,20 @@
package pkg
import "github.com/linkerd/linkerd2/pkg/k8s"
// ValidTargets specifies resource types allowed as a target:
// - target resource on an inbound query
// - target resource on an outbound 'to' query
// - destination resource on an outbound 'from' query
var ValidTargets = []string{
k8s.Authority,
k8s.CronJob,
k8s.DaemonSet,
k8s.Deployment,
k8s.Job,
k8s.Namespace,
k8s.Pod,
k8s.ReplicaSet,
k8s.ReplicationController,
k8s.StatefulSet,
}

33
viz/tap/Dockerfile Normal file
View File

@ -0,0 +1,33 @@
ARG BUILDPLATFORM=linux/amd64
# Precompile key slow-to-build dependencies
FROM --platform=$BUILDPLATFORM golang:1.14.2-alpine as go-deps
WORKDIR /linkerd-build
COPY go.mod go.sum ./
COPY bin/install-deps bin/
RUN go mod download
ARG TARGETARCH
RUN ./bin/install-deps $TARGETARCH
## compile tap
FROM go-deps as golang
WORKDIR /linkerd-build
COPY pkg pkg
# TODO: remove after https://github.com/linkerd/linkerd2/issues/5661
COPY controller controller
# TODO: remove when BuildResource is refactored
# https://github.com/linkerd/linkerd2/issues/5589
COPY viz/metrics-api/gen/viz viz/metrics-api/gen/viz
COPY viz/tap viz/tap
COPY viz/pkg viz/pkg
ARG TARGETARCH
RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH go build -o /out/tap -tags prod -mod=readonly -ldflags "-s -w" ./viz/tap/cmd
## package runtime
FROM scratch
ENV PATH=$PATH:/go/bin
COPY LICENSE /linkerd/LICENSE
COPY --from=golang /out/tap /go/bin/tap
ENTRYPOINT ["/go/bin/tap"]

View File

@ -1,7 +1,7 @@
package tap
package api
import (
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"google.golang.org/grpc"
)
@ -11,6 +11,5 @@ func NewClient(addr string) (pb.TapClient, *grpc.ClientConn, error) {
if err != nil {
return nil, nil, err
}
return pb.NewTapClient(conn), conn, nil
}

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"context"
@ -19,8 +19,9 @@ import (
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/prometheus"
"github.com/linkerd/linkerd2/pkg/util"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizLabels "github.com/linkerd/linkerd2/viz/pkg/labels"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
@ -47,12 +48,12 @@ var (
)
// Tap is deprecated, use TapByResource.
func (s *GRPCTapServer) Tap(req *pb.TapRequest, stream pb.Tap_TapServer) error {
func (s *GRPCTapServer) Tap(req *tapPb.TapRequest, stream tapPb.Tap_TapServer) error {
return status.Error(codes.Unimplemented, "Tap is deprecated, use TapByResource")
}
// TapByResource taps all resources matched by the request object.
func (s *GRPCTapServer) TapByResource(req *pb.TapByResourceRequest, stream pb.Tap_TapByResourceServer) error {
func (s *GRPCTapServer) TapByResource(req *tapPb.TapByResourceRequest, stream tapPb.Tap_TapByResourceServer) error {
if req == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest")
}
@ -115,7 +116,7 @@ func (s *GRPCTapServer) TapByResource(req *pb.TapByResourceRequest, stream pb.Ta
log.Infof("Tapping %d pods for target: %s", len(pods), res.String())
events := make(chan *pb.TapEvent)
events := make(chan *tapPb.TapEvent)
// divide the rps evenly between all pods to tap
rpsPerPod := req.GetMaxRps() / float32(len(pods))
@ -168,7 +169,7 @@ func (s *GRPCTapServer) TapByResource(req *pb.TapByResourceRequest, stream pb.Ta
}
}
func makeByResourceMatch(match *pb.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) {
func makeByResourceMatch(match *tapPb.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) {
// TODO: for now assume it's always a single, flat `All` match list
seq := match.GetAll()
if seq == nil {
@ -179,7 +180,7 @@ func makeByResourceMatch(match *pb.TapByResourceRequest_Match) (*proxy.ObserveRe
for _, reqMatch := range seq.Matches {
switch typed := reqMatch.Match.(type) {
case *pb.TapByResourceRequest_Match_Destinations:
case *tapPb.TapByResourceRequest_Match_Destinations:
for k, v := range destinationLabels(typed.Destinations.Resource) {
matches = append(matches, &proxy.ObserveRequest_Match{
@ -192,24 +193,24 @@ func makeByResourceMatch(match *pb.TapByResourceRequest_Match) (*proxy.ObserveRe
})
}
case *pb.TapByResourceRequest_Match_Http_:
case *tapPb.TapByResourceRequest_Match_Http_:
httpMatch := proxy.ObserveRequest_Match_Http{}
switch httpTyped := typed.Http.Match.(type) {
case *pb.TapByResourceRequest_Match_Http_Scheme:
case *tapPb.TapByResourceRequest_Match_Http_Scheme:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Scheme{
Scheme: util.ParseScheme(httpTyped.Scheme),
},
}
case *pb.TapByResourceRequest_Match_Http_Method:
case *tapPb.TapByResourceRequest_Match_Http_Method:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Method{
Method: util.ParseMethod(httpTyped.Method),
},
}
case *pb.TapByResourceRequest_Match_Http_Authority:
case *tapPb.TapByResourceRequest_Match_Http_Authority:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Authority{
Authority: &proxy.ObserveRequest_Match_Http_StringMatch{
@ -219,7 +220,7 @@ func makeByResourceMatch(match *pb.TapByResourceRequest_Match) (*proxy.ObserveRe
},
},
}
case *pb.TapByResourceRequest_Match_Http_Path:
case *tapPb.TapByResourceRequest_Match_Http_Path:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Path{
Path: &proxy.ObserveRequest_Match_Http_StringMatch{
@ -254,7 +255,7 @@ func makeByResourceMatch(match *pb.TapByResourceRequest_Match) (*proxy.ObserveRe
}
// TODO: factor out with `promLabels` in public-api
func destinationLabels(resource *pb.Resource) map[string]string {
func destinationLabels(resource *metricsPb.Resource) map[string]string {
dstLabels := map[string]string{}
if resource.Name != "" {
l5dLabel := pkgK8s.KindToL5DLabel(resource.Type)
@ -266,7 +267,7 @@ func destinationLabels(resource *pb.Resource) map[string]string {
return dstLabels
}
func buildExtractHTTP(extract *pb.TapByResourceRequest_Extract_Http) *proxy.ObserveRequest_Extract {
func buildExtractHTTP(extract *tapPb.TapByResourceRequest_Extract_Http) *proxy.ObserveRequest_Extract {
if extract.GetHeaders() != nil {
return &proxy.ObserveRequest_Extract{
Extract: &proxy.ObserveRequest_Extract_Http_{
@ -289,7 +290,7 @@ func buildExtractHTTP(extract *pb.TapByResourceRequest_Extract_Http) *proxy.Obse
// of maxRps * 1s at most once per 1s window. If this limit is reached in
// less than 1s, we sleep until the end of the window before calling Observe
// again.
func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, extract *proxy.ObserveRequest_Extract, addr string, events chan *pb.TapEvent) {
func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, extract *proxy.ObserveRequest_Extract, addr string, events chan *tapPb.TapEvent) {
tapAddr := fmt.Sprintf("%s:%d", addr, s.tapPort)
log.Infof("Establishing tap on %s", tapAddr)
conn, err := grpc.DialContext(ctx, tapAddr, grpc.WithInsecure())
@ -341,37 +342,37 @@ func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *pro
}
}
func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent) *pb.TapEvent {
direction := func(orig proxy.TapEvent_ProxyDirection) pb.TapEvent_ProxyDirection {
func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent) *tapPb.TapEvent {
direction := func(orig proxy.TapEvent_ProxyDirection) tapPb.TapEvent_ProxyDirection {
switch orig {
case proxy.TapEvent_INBOUND:
return pb.TapEvent_INBOUND
return tapPb.TapEvent_INBOUND
case proxy.TapEvent_OUTBOUND:
return pb.TapEvent_OUTBOUND
return tapPb.TapEvent_OUTBOUND
default:
return pb.TapEvent_UNKNOWN
return tapPb.TapEvent_UNKNOWN
}
}
event := func(orig *proxy.TapEvent_Http) *pb.TapEvent_Http_ {
id := func(orig *proxy.TapEvent_Http_StreamId) *pb.TapEvent_Http_StreamId {
return &pb.TapEvent_Http_StreamId{
event := func(orig *proxy.TapEvent_Http) *tapPb.TapEvent_Http_ {
id := func(orig *proxy.TapEvent_Http_StreamId) *tapPb.TapEvent_Http_StreamId {
return &tapPb.TapEvent_Http_StreamId{
Base: orig.GetBase(),
Stream: orig.GetStream(),
}
}
method := func(orig *httpPb.HttpMethod) *pb.HttpMethod {
method := func(orig *httpPb.HttpMethod) *metricsPb.HttpMethod {
switch m := orig.GetType().(type) {
case *httpPb.HttpMethod_Registered_:
return &pb.HttpMethod{
Type: &pb.HttpMethod_Registered_{
Registered: pb.HttpMethod_Registered(m.Registered),
return &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Registered_{
Registered: metricsPb.HttpMethod_Registered(m.Registered),
},
}
case *httpPb.HttpMethod_Unregistered:
return &pb.HttpMethod{
Type: &pb.HttpMethod_Unregistered{
return &metricsPb.HttpMethod{
Type: &metricsPb.HttpMethod_Unregistered{
Unregistered: m.Unregistered,
},
}
@ -380,17 +381,17 @@ func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent
}
}
scheme := func(orig *httpPb.Scheme) *pb.Scheme {
scheme := func(orig *httpPb.Scheme) *metricsPb.Scheme {
switch s := orig.GetType().(type) {
case *httpPb.Scheme_Registered_:
return &pb.Scheme{
Type: &pb.Scheme_Registered_{
Registered: pb.Scheme_Registered(s.Registered),
return &metricsPb.Scheme{
Type: &metricsPb.Scheme_Registered_{
Registered: metricsPb.Scheme_Registered(s.Registered),
},
}
case *httpPb.Scheme_Unregistered:
return &pb.Scheme{
Type: &pb.Scheme_Unregistered{
return &metricsPb.Scheme{
Type: &metricsPb.Scheme_Unregistered{
Unregistered: s.Unregistered,
},
}
@ -399,31 +400,31 @@ func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent
}
}
headers := func(orig *httpPb.Headers) *pb.Headers {
headers := func(orig *httpPb.Headers) *metricsPb.Headers {
if orig == nil {
return nil
}
var headers []*pb.Headers_Header
var headers []*metricsPb.Headers_Header
for _, header := range orig.GetHeaders() {
n := header.GetName()
b := header.GetValue()
h := pb.Headers_Header{Name: n, Value: &pb.Headers_Header_ValueBin{ValueBin: b}}
h := metricsPb.Headers_Header{Name: n, Value: &metricsPb.Headers_Header_ValueBin{ValueBin: b}}
if utf8.Valid(b) {
h = pb.Headers_Header{Name: n, Value: &pb.Headers_Header_ValueStr{ValueStr: string(b)}}
h = metricsPb.Headers_Header{Name: n, Value: &metricsPb.Headers_Header_ValueStr{ValueStr: string(b)}}
}
headers = append(headers, &h)
}
return &pb.Headers{
return &metricsPb.Headers{
Headers: headers,
}
}
switch orig := orig.GetEvent().(type) {
case *proxy.TapEvent_Http_RequestInit_:
return &pb.TapEvent_Http_{
Http: &pb.TapEvent_Http{
Event: &pb.TapEvent_Http_RequestInit_{
RequestInit: &pb.TapEvent_Http_RequestInit{
return &tapPb.TapEvent_Http_{
Http: &tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_RequestInit_{
RequestInit: &tapPb.TapEvent_Http_RequestInit{
Id: id(orig.RequestInit.GetId()),
Method: method(orig.RequestInit.GetMethod()),
Scheme: scheme(orig.RequestInit.GetScheme()),
@ -436,10 +437,10 @@ func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent
}
case *proxy.TapEvent_Http_ResponseInit_:
return &pb.TapEvent_Http_{
Http: &pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseInit_{
ResponseInit: &pb.TapEvent_Http_ResponseInit{
return &tapPb.TapEvent_Http_{
Http: &tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseInit_{
ResponseInit: &tapPb.TapEvent_Http_ResponseInit{
Id: id(orig.ResponseInit.GetId()),
SinceRequestInit: orig.ResponseInit.GetSinceRequestInit(),
HttpStatus: orig.ResponseInit.GetHttpStatus(),
@ -450,17 +451,17 @@ func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent
}
case *proxy.TapEvent_Http_ResponseEnd_:
eos := func(orig *proxy.Eos) *pb.Eos {
eos := func(orig *proxy.Eos) *metricsPb.Eos {
switch e := orig.GetEnd().(type) {
case *proxy.Eos_ResetErrorCode:
return &pb.Eos{
End: &pb.Eos_ResetErrorCode{
return &metricsPb.Eos{
End: &metricsPb.Eos_ResetErrorCode{
ResetErrorCode: e.ResetErrorCode,
},
}
case *proxy.Eos_GrpcStatusCode:
return &pb.Eos{
End: &pb.Eos_GrpcStatusCode{
return &metricsPb.Eos{
End: &metricsPb.Eos_GrpcStatusCode{
GrpcStatusCode: e.GrpcStatusCode,
},
}
@ -469,10 +470,10 @@ func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent
}
}
return &pb.TapEvent_Http_{
Http: &pb.TapEvent_Http{
Event: &pb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &pb.TapEvent_Http_ResponseEnd{
return &tapPb.TapEvent_Http_{
Http: &tapPb.TapEvent_Http{
Event: &tapPb.TapEvent_Http_ResponseEnd_{
ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
Id: id(orig.ResponseEnd.GetId()),
SinceRequestInit: orig.ResponseEnd.GetSinceRequestInit(),
SinceResponseInit: orig.ResponseEnd.GetSinceResponseInit(),
@ -498,16 +499,16 @@ func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent
destinationLabels = make(map[string]string)
}
ev := &pb.TapEvent{
ev := &tapPb.TapEvent{
Source: addr.NetToPublic(orig.GetSource()),
SourceMeta: &pb.TapEvent_EndpointMeta{
SourceMeta: &tapPb.TapEvent_EndpointMeta{
Labels: sourceLabels,
},
Destination: addr.NetToPublic(orig.GetDestination()),
DestinationMeta: &pb.TapEvent_EndpointMeta{
DestinationMeta: &tapPb.TapEvent_EndpointMeta{
Labels: destinationLabels,
},
RouteMeta: &pb.TapEvent_RouteMeta{
RouteMeta: &tapPb.TapEvent_RouteMeta{
Labels: orig.GetRouteMeta().GetLabels(),
},
ProxyDirection: direction(orig.GetProxyDirection()),
@ -546,7 +547,7 @@ func newGRPCTapServer(
}
s := prometheus.NewGrpcServer()
pb.RegisterTapServer(s, srv)
tapPb.RegisterTapServer(s, srv)
return srv
}
@ -574,13 +575,13 @@ func indexByIP(obj interface{}) ([]string, error) {
//
// Since errors encountered while hydrating metadata are non-fatal and result
// only in missing labels, any errors are logged at the WARN level.
func (s *GRPCTapServer) hydrateEventLabels(ctx context.Context, ev *pb.TapEvent) {
func (s *GRPCTapServer) hydrateEventLabels(ctx context.Context, ev *tapPb.TapEvent) {
err := s.hydrateIPLabels(ctx, ev.GetSource().GetIp(), ev.GetSourceMeta().GetLabels())
if err != nil {
log.Warnf("error hydrating source labels: %s", err)
}
if ev.ProxyDirection == pb.TapEvent_INBOUND {
if ev.ProxyDirection == tapPb.TapEvent_INBOUND {
// Events emitted by an inbound proxies don't have destination labels,
// since the inbound proxy _is_ the destination, and proxies don't know
// their own labels.
@ -665,7 +666,7 @@ func (s *GRPCTapServer) resourceForIP(ip *netPb.IPAddress) (runtime.Object, erro
return singleRunningPod, nil
}
func getLabelSelector(req *pb.TapByResourceRequest) (labels.Selector, error) {
func getLabelSelector(req *tapPb.TapByResourceRequest) (labels.Selector, error) {
labelSelector := labels.Everything()
if s := req.GetTarget().GetLabelSelector(); s != "" {
var err error

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"context"
@ -13,7 +13,8 @@ import (
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/addr"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
@ -23,7 +24,7 @@ import (
type tapExpected struct {
err error
k8sRes []string
req *pb.TapByResourceRequest
req *tapPb.TapByResourceRequest
requireID string
}
@ -32,7 +33,7 @@ type mockTapByResourceServer struct {
util.MockServerStream
}
func (m *mockTapByResourceServer) Send(event *pb.TapEvent) error {
func (m *mockTapByResourceServer) Send(event *tapPb.TapEvent) error {
return nil
}
@ -53,7 +54,7 @@ func TestTapByResource(t *testing.T) {
{
err: status.Error(codes.InvalidArgument, "TapByResource received nil target ResourceSelection"),
k8sRes: []string{},
req: &pb.TapByResourceRequest{},
req: &tapPb.TapByResourceRequest{},
},
{
err: status.Errorf(codes.Unimplemented, "unexpected match specified: any:{}"),
@ -74,17 +75,17 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_Any{
Any: &pb.TapByResourceRequest_Match_Seq{},
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_Any{
Any: &tapPb.TapByResourceRequest_Match_Seq{},
},
},
},
@ -104,9 +105,9 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-not-meshed",
@ -117,9 +118,9 @@ status:
{
err: status.Errorf(codes.Unimplemented, "unimplemented resource type: bad-type"),
k8sRes: []string{},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: "bad-type",
Name: "emojivoto-meshed-not-found",
@ -145,9 +146,9 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-not-found",
@ -173,9 +174,9 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
@ -203,17 +204,17 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-tap-disabled",
},
},
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_All{
All: &pb.TapByResourceRequest_Match_Seq{},
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_All{
All: &tapPb.TapByResourceRequest_Match_Seq{},
},
},
},
@ -237,17 +238,17 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed-tap-not-enabled",
},
},
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_All{
All: &pb.TapByResourceRequest_Match_Seq{},
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_All{
All: &tapPb.TapByResourceRequest_Match_Seq{},
},
},
},
@ -272,17 +273,17 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_All{
All: &pb.TapByResourceRequest_Match_Seq{},
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_All{
All: &tapPb.TapByResourceRequest_Match_Seq{},
},
},
},
@ -309,17 +310,17 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "emojivoto",
Type: pkgK8s.Pod,
Name: "emojivoto-meshed",
},
},
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_All{
All: &pb.TapByResourceRequest_Match_Seq{},
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_All{
All: &tapPb.TapByResourceRequest_Match_Seq{},
},
},
},
@ -351,17 +352,17 @@ status:
podIP: 127.0.0.1
`,
},
req: &pb.TapByResourceRequest{
Target: &pb.ResourceSelection{
Resource: &pb.Resource{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "",
Type: pkgK8s.Namespace,
Name: "emojivoto",
},
},
Match: &pb.TapByResourceRequest_Match{
Match: &pb.TapByResourceRequest_Match_All{
All: &pb.TapByResourceRequest_Match_Seq{},
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_All{
All: &tapPb.TapByResourceRequest_Match_Seq{},
},
},
},

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"context"
@ -12,8 +12,8 @@ import (
"github.com/linkerd/linkerd2/controller/k8s"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/linkerd/linkerd2/viz/tap/pkg"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
@ -140,7 +140,7 @@ func (h *handler) handleTap(w http.ResponseWriter, req *http.Request, p httprout
req.Header[h.groupHeader],
)
if err != nil {
err = fmt.Errorf("tap authorization failed (%s), visit %s for more information", err, tap.TapRbacURL)
err = fmt.Errorf("tap authorization failed (%s), visit %s for more information", err, pkg.TapRbacURL)
h.log.Error(err)
renderJSONError(w, err, http.StatusForbidden)
return
@ -155,7 +155,7 @@ func (h *handler) handleTap(w http.ResponseWriter, req *http.Request, p httprout
return
}
url := protohttp.TapReqToURL(&tapReq)
url := pkg.TapReqToURL(&tapReq)
if url != req.URL.Path {
err = fmt.Errorf("tap request body did not match APIServer URL: %+v != %+v", url, req.URL.Path)
h.log.Error(err)

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"fmt"

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"context"
@ -8,7 +8,6 @@ import (
"syscall"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/controller/tap"
"github.com/linkerd/linkerd2/pkg/admin"
"github.com/linkerd/linkerd2/pkg/flags"
"github.com/linkerd/linkerd2/pkg/trace"
@ -20,23 +19,18 @@ const defaultDomain = "cluster.local"
// Main executes the tap subcommand
func Main(args []string) {
cmd := flag.NewFlagSet("tap", flag.ExitOnError)
apiServerAddr := cmd.String("apiserver-addr", ":8089", "address to serve the apiserver on")
metricsAddr := cmd.String("metrics-addr", ":9998", "address to serve scrapable metrics on")
kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
controllerNamespace := cmd.String("controller-namespace", "linkerd", "namespace in which Linkerd is installed")
apiNamespace := cmd.String("api-namespace", "linkerd", "namespace in which Linkerd is installed")
tapPort := cmd.Uint("tap-port", 4190, "proxy tap port to connect to")
disableCommonNames := cmd.Bool("disable-common-names", false, "disable checks for Common Names (for development)")
trustDomain := cmd.String("identity-trust-domain", defaultDomain, "configures the name suffix used for identities")
traceCollector := flags.AddTraceFlags(cmd)
flags.ConfigureAndParse(cmd, args)
ctx := context.Background()
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
k8sAPI, err := k8s.InitializeAPI(
ctx,
*kubeConfigPath,
@ -56,28 +50,21 @@ func Main(args []string) {
if err != nil {
log.Fatalf("Failed to initialize K8s API: %s", err)
}
log.Infof("Using trust domain: %s", *trustDomain)
if *traceCollector != "" {
if err := trace.InitializeTracing("linkerd-tap", *traceCollector); err != nil {
log.Warnf("failed to initialize tracing: %s", err)
}
}
grpcTapServer := tap.NewGrpcTapServer(*tapPort, *controllerNamespace, *trustDomain, k8sAPI)
apiServer, err := tap.NewAPIServer(ctx, *apiServerAddr, k8sAPI, grpcTapServer, *disableCommonNames)
grpcTapServer := NewGrpcTapServer(*tapPort, *apiNamespace, *trustDomain, k8sAPI)
apiServer, err := NewServer(ctx, *apiServerAddr, k8sAPI, grpcTapServer, *disableCommonNames)
if err != nil {
log.Fatal(err.Error())
}
k8sAPI.Sync(nil) // blocks until caches are synced
k8sAPI.Sync(nil)
go apiServer.Start(ctx)
go admin.StartServer(*metricsAddr)
<-stop
log.Infof("shutting down APIServer on %s", *apiServerAddr)
apiServer.Shutdown(ctx)
}

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"context"
@ -62,7 +62,7 @@ data:
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
clientCAPem, allowedNames, usernameHeader, groupHeader, err := apiServerAuth(ctx, k8sAPI)
clientCAPem, allowedNames, usernameHeader, groupHeader, err := serverAuth(ctx, k8sAPI)
if !reflect.DeepEqual(err, exp.err) {
t.Errorf("apiServerAuth returned unexpected error: %s, expected: %s", err, exp.err)
}
@ -90,7 +90,7 @@ func TestValidate(t *testing.T) {
req := http.Request{TLS: &tls}
server := APIServer{}
server := Server{}
if err := server.validate(&req); err != nil {
t.Fatalf("No error expected for %q but encountered %q", cert.Subject.CommonName, err.Error())
}
@ -104,7 +104,7 @@ func TestValidate_ClientAllowed(t *testing.T) {
req := http.Request{TLS: &tls}
server := APIServer{allowedNames: []string{"name-trusted"}}
server := Server{allowedNames: []string{"name-trusted"}}
if err := server.validate(&req); err != nil {
t.Fatalf("No error expected for %q but encountered %q", cert.Subject.CommonName, err.Error())
}
@ -118,7 +118,7 @@ func TestValidate_ClientAllowedViaSAN(t *testing.T) {
req := http.Request{TLS: &tls}
server := APIServer{allowedNames: []string{"linkerd.io"}}
server := Server{allowedNames: []string{"linkerd.io"}}
if err := server.validate(&req); err != nil {
t.Fatalf("No error expected for %q but encountered %q", cert.Subject.CommonName, err.Error())
}
@ -132,7 +132,7 @@ func TestValidate_ClientNotAllowed(t *testing.T) {
req := http.Request{TLS: &tls}
server := APIServer{allowedNames: []string{"name-trusted"}}
server := Server{allowedNames: []string{"name-trusted"}}
if err := server.validate(&req); err == nil {
t.Fatalf("Expected request to be rejected for %q", cert.Subject.CommonName)
}

View File

@ -1,4 +1,4 @@
package tap
package api
import (
"context"
@ -12,18 +12,17 @@ import (
"github.com/julienschmidt/httprouter"
"github.com/linkerd/linkerd2/controller/k8s"
k8sutils "github.com/linkerd/linkerd2/pkg/k8s"
pkgk8s "github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/prometheus"
pkgTls "github.com/linkerd/linkerd2/pkg/tls"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
"github.com/prometheus/common/log"
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// APIServer holds the underlying http server and its config
type APIServer struct {
// Server holds the underlying http server and its config
type Server struct {
*http.Server
listener net.Listener
router *httprouter.Router
@ -32,14 +31,14 @@ type APIServer struct {
log *logrus.Entry
}
// NewAPIServer creates a new server that implements the Tap APIService.
func NewAPIServer(
// NewServer creates a new server that implements the Tap APIService.
func NewServer(
ctx context.Context,
addr string,
k8sAPI *k8s.API,
grpcTapServer pb.TapServer,
disableCommonNames bool,
) (*APIServer, error) {
) (*Server, error) {
updateEvent := make(chan struct{})
errEvent := make(chan error)
watcher := pkgTls.NewFsCredsWatcher(pkgk8s.MountPathTLSBase, updateEvent, errEvent).
@ -50,7 +49,7 @@ func NewAPIServer(
}
}()
clientCAPem, allowedNames, usernameHeader, groupHeader, err := apiServerAuth(ctx, k8sAPI)
clientCAPem, allowedNames, usernameHeader, groupHeader, err := serverAuth(ctx, k8sAPI)
if err != nil {
return nil, err
}
@ -61,7 +60,7 @@ func NewAPIServer(
}
log := logrus.WithFields(logrus.Fields{
"component": "apiserver",
"component": "tap",
"addr": addr,
})
@ -90,7 +89,7 @@ func NewAPIServer(
return nil, fmt.Errorf("net.Listen failed with: %s", err)
}
s := &APIServer{
s := &Server{
Server: httpServer,
listener: lis,
router: initRouter(h),
@ -111,8 +110,8 @@ func NewAPIServer(
}
// Start starts the https server
func (a *APIServer) Start(ctx context.Context) {
a.log.Infof("starting APIServer on %s", a.Server.Addr)
func (a *Server) Start(ctx context.Context) {
a.log.Infof("starting tap API server on %s", a.Server.Addr)
if err := a.ServeTLS(a.listener, "", ""); err != nil {
if err == http.ErrServerClosed {
return
@ -121,12 +120,12 @@ func (a *APIServer) Start(ctx context.Context) {
}
}
func (a *APIServer) getCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error) {
func (a *Server) getCertificate(_ *tls.ClientHelloInfo) (*tls.Certificate, error) {
return a.certValue.Load().(*tls.Certificate), nil
}
// ServeHTTP handles all routes for the APIServer.
func (a *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// ServeHTTP handles all routes for the Server.
func (a *Server) ServeHTTP(w http.ResponseWriter, req *http.Request) {
a.log.Debugf("ServeHTTP(): %+v", req)
if err := a.validate(req); err != nil {
a.log.Debug(err)
@ -137,7 +136,7 @@ func (a *APIServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
}
// validate ensures that the request should be honored returning an error otherwise.
func (a *APIServer) validate(req *http.Request) error {
func (a *Server) validate(req *http.Request) error {
// if `requestheader-allowed-names` was empty, allow any CN
if len(a.allowedNames) > 0 {
for _, cn := range a.allowedNames {
@ -158,21 +157,21 @@ func (a *APIServer) validate(req *http.Request) error {
return nil
}
// apiServerAuth parses the relevant data out of a ConfigMap to enable client
// TLS authentication.
// serverAuth parses the relevant data out of a ConfigMap to enable client TLS
// authentication.
// kubectl -n kube-system get cm/extension-apiserver-authentication
// accessible via the extension-apiserver-authentication-reader role
func apiServerAuth(ctx context.Context, k8sAPI *k8s.API) (string, []string, string, string, error) {
func serverAuth(ctx context.Context, k8sAPI *k8s.API) (string, []string, string, string, error) {
cm, err := k8sAPI.Client.CoreV1().
ConfigMaps(metav1.NamespaceSystem).
Get(ctx, k8sutils.ExtensionAPIServerAuthenticationConfigMapName, metav1.GetOptions{})
Get(ctx, pkgk8s.ExtensionAPIServerAuthenticationConfigMapName, metav1.GetOptions{})
if err != nil {
return "", nil, "", "", fmt.Errorf("failed to load [%s] config: %s", k8sutils.ExtensionAPIServerAuthenticationConfigMapName, err)
return "", nil, "", "", fmt.Errorf("failed to load [%s] config: %s", pkgk8s.ExtensionAPIServerAuthenticationConfigMapName, err)
}
clientCAPem, ok := cm.Data[k8sutils.ExtensionAPIServerAuthenticationRequestHeaderClientCAFileKey]
clientCAPem, ok := cm.Data[pkgk8s.ExtensionAPIServerAuthenticationRequestHeaderClientCAFileKey]
if !ok {
return "", nil, "", "", fmt.Errorf("no client CA cert available for apiextension-server")

22
viz/tap/cmd/main.go Normal file
View File

@ -0,0 +1,22 @@
package main
import (
"fmt"
"os"
"github.com/linkerd/linkerd2/viz/tap/api"
"github.com/linkerd/linkerd2/viz/tap/injector"
)
func main() {
if len(os.Args) < 2 {
fmt.Println("expected a subcommand")
os.Exit(1)
}
switch os.Args[1] {
case "api":
api.Main(os.Args[2:])
case "injector":
injector.Main(os.Args[2:])
}
}

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,4 @@
package tapinjector
package injector
import (
"context"
@ -6,7 +6,6 @@ import (
"fmt"
"github.com/linkerd/linkerd2/controller/k8s"
tapinjector "github.com/linkerd/linkerd2/controller/tap-injector"
"github.com/linkerd/linkerd2/controller/webhook"
"github.com/linkerd/linkerd2/pkg/flags"
)
@ -22,7 +21,7 @@ func Main(args []string) {
webhook.Launch(
context.Background(),
[]k8s.APIResource{k8s.NS},
tapinjector.Mutate(*tapSvcName),
Mutate(*tapSvcName),
"tap-injector",
*metricsAddr,
*addr,

View File

@ -1,12 +1,6 @@
package tapinjector
package injector
import (
"fmt"
"github.com/linkerd/linkerd2/pkg/inject"
)
var tpl = fmt.Sprintf(`[
var tpl = `[
{
"op": "add",
"path": "/metadata/annotations/viz.linkerd.io~1tap-enabled",
@ -16,8 +10,8 @@ var tpl = fmt.Sprintf(`[
"op": "add",
"path": "/spec/containers/{{.ProxyIndex}}/env/-",
"value": {
"name": "%s",
"name": "LINKERD2_PROXY_TAP_SVC_NAME",
"value": "{{.ProxyTapSvcName}}"
}
}
]`, inject.TapSvcEnvKey)
]`

View File

@ -1,4 +1,4 @@
package tapinjector
package injector
import (
"bytes"

40
viz/tap/pkg/events.go Normal file
View File

@ -0,0 +1,40 @@
package pkg
import (
"encoding/binary"
netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
)
// CreateTapEvent generates tap events for use in tests
func CreateTapEvent(eventHTTP *tapPb.TapEvent_Http, dstMeta map[string]string, proxyDirection tapPb.TapEvent_ProxyDirection) *tapPb.TapEvent {
event := &tapPb.TapEvent{
ProxyDirection: proxyDirection,
Source: &netPb.TcpAddress{
Ip: &netPb.IPAddress{
Ip: &netPb.IPAddress_Ipv4{
Ipv4: uint32(1),
},
},
},
Destination: &netPb.TcpAddress{
Ip: &netPb.IPAddress{
Ip: &netPb.IPAddress_Ipv6{
Ipv6: &netPb.IPv6{
// All nodes address: https://www.iana.org/assignments/ipv6-multicast-addresses/ipv6-multicast-addresses.xhtml
First: binary.BigEndian.Uint64([]byte{0xff, 0x01, 0, 0, 0, 0, 0, 0}),
Last: binary.BigEndian.Uint64([]byte{0, 0, 0, 0, 0, 0, 0, 0x01}),
},
},
},
},
Event: &tapPb.TapEvent_Http_{
Http: eventHTTP,
},
DestinationMeta: &tapPb.TapEvent_EndpointMeta{
Labels: dstMeta,
},
}
return event
}

28
viz/tap/pkg/protohttp.go Normal file
View File

@ -0,0 +1,28 @@
package pkg
import (
"fmt"
"github.com/linkerd/linkerd2/pkg/k8s"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
)
// TapReqToURL converts a TapByResourceRequest protobuf object to a URL for use
// with the Kubernetes tap.linkerd.io APIService.
func TapReqToURL(req *tapPb.TapByResourceRequest) string {
res := req.GetTarget().GetResource()
// non-namespaced
if res.GetType() == k8s.Namespace {
return fmt.Sprintf(
"/apis/tap.linkerd.io/v1alpha1/watch/namespaces/%s/tap",
res.GetName(),
)
}
// namespaced
return fmt.Sprintf(
"/apis/tap.linkerd.io/v1alpha1/watch/namespaces/%s/%s/%s/tap",
res.GetNamespace(), res.GetType()+"s", res.GetName(),
)
}

View File

@ -0,0 +1,54 @@
package pkg
import (
"fmt"
"testing"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
)
func TestTapReqToURL(t *testing.T) {
expectations := []struct {
req *tapPb.TapByResourceRequest
url string
}{
{
req: &tapPb.TapByResourceRequest{},
url: "/apis/tap.linkerd.io/v1alpha1/watch/namespaces//s//tap",
},
{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Type: "namespace",
Name: "test-name",
},
},
},
url: "/apis/tap.linkerd.io/v1alpha1/watch/namespaces/test-name/tap",
},
{
req: &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: "test-ns",
Type: "test-type",
Name: "test-name",
},
},
},
url: "/apis/tap.linkerd.io/v1alpha1/watch/namespaces/test-ns/test-types/test-name/tap",
},
}
for i, exp := range expectations {
exp := exp // pin
t.Run(fmt.Sprintf("%d constructs the expected URL from a TapRequest", i), func(t *testing.T) {
url := TapReqToURL(exp.req)
if url != exp.url {
t.Fatalf("Unexpected url: %s, Expected: %s", url, exp.url)
}
})
}
}

View File

@ -1,4 +1,4 @@
package tap
package pkg
import (
"bufio"
@ -12,7 +12,7 @@ import (
"github.com/golang/protobuf/proto"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
pb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
log "github.com/sirupsen/logrus"
)
@ -42,7 +42,7 @@ func Reader(ctx context.Context, k8sAPI *k8s.KubernetesAPI, req *pb.TapByResourc
if err != nil {
return nil, nil, err
}
url.Path = fmt.Sprintf("%s%s", url.Path, protohttp.TapReqToURL(req))
url.Path = fmt.Sprintf("%s%s", url.Path, TapReqToURL(req))
httpReq, err := http.NewRequest(
http.MethodPost,

150
viz/tap/pkg/requests.go Normal file
View File

@ -0,0 +1,150 @@
package pkg
import (
"fmt"
"github.com/linkerd/linkerd2/controller/api/util"
"github.com/linkerd/linkerd2/pkg/k8s"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
"github.com/linkerd/linkerd2/viz/pkg"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
)
// ValidTapDestinations specifies resource types allowed as a tap destination:
// - destination resource on an outbound 'to' query
var ValidTapDestinations = []string{
k8s.CronJob,
k8s.DaemonSet,
k8s.Deployment,
k8s.Job,
k8s.Namespace,
k8s.Pod,
k8s.ReplicaSet,
k8s.ReplicationController,
k8s.Service,
k8s.StatefulSet,
}
// TapRequestParams contains parameters that are used to build a
// TapByResourceRequest.
type TapRequestParams struct {
Resource string
Namespace string
ToResource string
ToNamespace string
MaxRps float32
Scheme string
Method string
Authority string
Path string
Extract bool
LabelSelector string
}
// BuildTapByResourceRequest builds a Public API TapByResourceRequest from a
// TapRequestParams.
func BuildTapByResourceRequest(params TapRequestParams) (*tapPb.TapByResourceRequest, error) {
target, err := util.BuildResource(params.Namespace, params.Resource)
if err != nil {
return nil, fmt.Errorf("target resource invalid: %s", err)
}
if !contains(pkg.ValidTargets, target.Type) {
return nil, fmt.Errorf("unsupported resource type [%s]", target.Type)
}
matches := []*tapPb.TapByResourceRequest_Match{}
if params.ToResource != "" {
destination, err := util.BuildResource(params.ToNamespace, params.ToResource)
if err != nil {
return nil, fmt.Errorf("destination resource invalid: %s", err)
}
if !contains(ValidTapDestinations, destination.Type) {
return nil, fmt.Errorf("unsupported resource type [%s]", destination.Type)
}
match := tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_Destinations{
Destinations: &metricsPb.ResourceSelection{
Resource: destination,
},
},
}
matches = append(matches, &match)
}
if params.Scheme != "" {
match := buildMatchHTTP(&tapPb.TapByResourceRequest_Match_Http{
Match: &tapPb.TapByResourceRequest_Match_Http_Scheme{Scheme: params.Scheme},
})
matches = append(matches, &match)
}
if params.Method != "" {
match := buildMatchHTTP(&tapPb.TapByResourceRequest_Match_Http{
Match: &tapPb.TapByResourceRequest_Match_Http_Method{Method: params.Method},
})
matches = append(matches, &match)
}
if params.Authority != "" {
match := buildMatchHTTP(&tapPb.TapByResourceRequest_Match_Http{
Match: &tapPb.TapByResourceRequest_Match_Http_Authority{Authority: params.Authority},
})
matches = append(matches, &match)
}
if params.Path != "" {
match := buildMatchHTTP(&tapPb.TapByResourceRequest_Match_Http{
Match: &tapPb.TapByResourceRequest_Match_Http_Path{Path: params.Path},
})
matches = append(matches, &match)
}
extract := &tapPb.TapByResourceRequest_Extract{}
if params.Extract {
extract = buildExtractHTTP(&tapPb.TapByResourceRequest_Extract_Http{
Extract: &tapPb.TapByResourceRequest_Extract_Http_Headers_{
Headers: &tapPb.TapByResourceRequest_Extract_Http_Headers{},
},
})
}
return &tapPb.TapByResourceRequest{
Target: &metricsPb.ResourceSelection{
Resource: target,
LabelSelector: params.LabelSelector,
},
MaxRps: params.MaxRps,
Match: &tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_All{
All: &tapPb.TapByResourceRequest_Match_Seq{
Matches: matches,
},
},
},
Extract: extract,
}, nil
}
func buildMatchHTTP(match *tapPb.TapByResourceRequest_Match_Http) tapPb.TapByResourceRequest_Match {
return tapPb.TapByResourceRequest_Match{
Match: &tapPb.TapByResourceRequest_Match_Http_{
Http: match,
},
}
}
func buildExtractHTTP(extract *tapPb.TapByResourceRequest_Extract_Http) *tapPb.TapByResourceRequest_Extract {
return &tapPb.TapByResourceRequest_Extract{
Extract: &tapPb.TapByResourceRequest_Extract_Http_{
Http: extract,
},
}
}
func contains(list []string, s string) bool {
for _, elem := range list {
if s == elem {
return true
}
}
return false
}

172
viz/tap/proto/viz_tap.proto Normal file
View File

@ -0,0 +1,172 @@
syntax = "proto3";
package linkerd2.tap;
import "google/protobuf/duration.proto";
import "common/net.proto";
import "viz.proto";
option go_package = "github.com/linkerd/linkerd2/viz/tap/gen/tap";
message TapRequest {
option deprecated = true;
oneof target {
string pod = 1;
string deployment = 2;
}
// validation of these fields happens on the server
float maxRps = 3;
uint32 toPort = 4;
string toIP = 5;
uint32 fromPort = 6;
string fromIP = 7;
string scheme = 8;
string method = 9;
string authority = 10;
string path = 11;
}
// A tap request over kubernetes resources.
//
// This is used only by the tap APIServer.
message TapByResourceRequest {
// Describes the kubernetes pods that should be tapped.
viz.ResourceSelection target = 1;
// Selects over events to be reported.
Match match = 2;
// Limits the number of events to be inspected.
float maxRps = 3;
message Match {
oneof match {
// If empty, matches all messages.
Seq all = 1;
// If empty, matches no messages.
Seq any = 2;
// Inverts the inner match.
Match not = 3;
// Matches events being sent to any of the selected destinations.
viz.ResourceSelection destinations = 4;
// Matches HTTP requests by their metadata.
Http http = 5;
}
message Seq {
repeated Match matches = 1;
}
message Http {
oneof match {
string scheme = 1;
string method = 2;
string authority = 3;
string path = 4;
}
}
}
// Conditionally extracts components from requests and responses to include
// in tap events
Extract extract = 4;
message Extract {
oneof extract {
Http http = 1;
}
message Http {
oneof extract {
Headers headers = 1;
}
message Headers {}
}
}
}
// This is used only by the tap APIServer.
message TapEvent {
linkerd2.common.net.TcpAddress source = 1;
EndpointMeta source_meta = 5;
linkerd2.common.net.TcpAddress destination = 2;
EndpointMeta destination_meta = 4;
RouteMeta route_meta = 7;
ProxyDirection proxy_direction = 6;
enum ProxyDirection {
UNKNOWN = 0;
INBOUND = 1;
OUTBOUND = 2;
}
oneof event {
Http http = 3;
}
message EndpointMeta {
map<string, string> labels = 1;
}
message RouteMeta {
map<string, string> labels = 1;
}
message Http {
oneof event {
RequestInit request_init = 1;
ResponseInit response_init = 2;
ResponseEnd response_end = 3;
}
message StreamId {
// A randomized base (stable across a process's runtime)
uint32 base = 1;
// A stream id unique within the lifetime of `base`.
uint64 stream = 2;
}
message RequestInit {
StreamId id = 1;
viz.HttpMethod method = 2;
viz.Scheme scheme = 3;
string authority = 4;
string path = 5;
viz.Headers headers = 6;
}
message ResponseInit {
StreamId id = 1;
google.protobuf.Duration since_request_init = 2;
uint32 http_status = 3;
viz.Headers headers = 4;
}
message ResponseEnd {
StreamId id = 1;
google.protobuf.Duration since_request_init = 2;
google.protobuf.Duration since_response_init = 3;
uint64 response_bytes = 4;
viz.Eos eos = 5;
viz.Headers trailers = 6;
}
}
}
service Tap {
rpc Tap(TapRequest) returns (stream TapEvent) { option deprecated = true; }
rpc TapByResource(TapByResourceRequest) returns (stream TapEvent) { option deprecated = true; }
}

View File

@ -40,6 +40,8 @@ COPY web/srv web/srv
COPY controller controller
COPY viz/metrics-api viz/metrics-api
COPY viz/pkg viz/pkg
COPY viz/tap/gen/tap viz/tap/gen/tap
COPY viz/tap/pkg viz/tap/pkg
COPY pkg pkg
ARG TARGETARCH
RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH go build -mod=readonly -o web/web -ldflags "-s -w" ./web

View File

@ -15,14 +15,14 @@ import (
"github.com/golang/protobuf/proto"
"github.com/gorilla/websocket"
"github.com/julienschmidt/httprouter"
coreUtil "github.com/linkerd/linkerd2/controller/api/util"
publicPb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/pkg/healthcheck"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/protohttp"
"github.com/linkerd/linkerd2/pkg/tap"
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
vizUtil "github.com/linkerd/linkerd2/viz/metrics-api/util"
tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
tappkg "github.com/linkerd/linkerd2/viz/tap/pkg"
log "github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/yaml"
@ -98,9 +98,9 @@ func (h *handler) handleAPIVersion(w http.ResponseWriter, req *http.Request, p h
}
func (h *handler) handleAPIPods(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
pods, err := h.apiClient.ListPods(req.Context(), &pb.ListPodsRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
pods, err := h.apiClient.ListPods(req.Context(), &metricsPb.ListPodsRequest{
Selector: &metricsPb.ResourceSelection{
Resource: &metricsPb.Resource{
Namespace: req.FormValue("namespace"),
},
},
@ -115,7 +115,7 @@ func (h *handler) handleAPIPods(w http.ResponseWriter, req *http.Request, p http
}
func (h *handler) handleAPIServices(w http.ResponseWriter, req *http.Request, p httprouter.Params) {
services, err := h.apiClient.ListServices(req.Context(), &pb.ListServicesRequest{
services, err := h.apiClient.ListServices(req.Context(), &metricsPb.ListServicesRequest{
Namespace: req.FormValue("namespace"),
})
@ -257,27 +257,27 @@ func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httpr
return
}
var requestParams coreUtil.TapRequestParams
var requestParams tappkg.TapRequestParams
err = json.Unmarshal(message, &requestParams)
if err != nil {
websocketError(ws, websocket.CloseInternalServerErr, err)
return
}
tapReq, err := coreUtil.BuildTapByResourceRequest(requestParams)
tapReq, err := tappkg.BuildTapByResourceRequest(requestParams)
if err != nil {
websocketError(ws, websocket.CloseInternalServerErr, err)
return
}
go func() {
reader, body, err := tap.Reader(req.Context(), h.k8sAPI, tapReq)
reader, body, err := tappkg.Reader(req.Context(), h.k8sAPI, tapReq)
if err != nil {
// If there was a [403] error when initiating a tap, close the
// socket with `ClosePolicyViolation` status code so that the error
// renders without the error prefix in the banner
if httpErr, ok := err.(protohttp.HTTPError); ok && httpErr.Code == http.StatusForbidden {
err := fmt.Errorf("missing authorization, visit %s to remedy", tap.TapRbacURL)
err := fmt.Errorf("missing authorization, visit %s to remedy", tappkg.TapRbacURL)
websocketError(ws, websocket.ClosePolicyViolation, err)
return
}
@ -290,7 +290,7 @@ func (h *handler) handleAPITap(w http.ResponseWriter, req *http.Request, p httpr
defer body.Close()
for {
event := pb.TapEvent{}
event := tapPb.TapEvent{}
err := protohttp.FromByteStreamToProtocolBuffers(reader, &event)
if err == io.EOF {
break
@ -451,7 +451,7 @@ func (h *handler) handleAPIGateways(w http.ResponseWriter, req *http.Request, _
renderJSONError(w, err, http.StatusInternalServerError)
return
}
gatewayRequest := &pb.GatewaysRequest{
gatewayRequest := &metricsPb.GatewaysRequest{
TimeWindow: window,
GatewayNamespace: req.FormValue("gatewayNamespace"),
RemoteClusterName: req.FormValue("remoteClusterName"),