mirror of https://github.com/linkerd/linkerd2.git
viz: add viz profile command (#5621)
## What this changes This adds a `viz profile` command that outputs a service profile based off tap data. It is identical—but fixes—the current `profile --tap` command. Additionally, it removes the `--tap` flag from the `profile` command since this depends on the Viz extension being installed in order to tap a service. ## Why The `profile --tap` command is currently broken since it depends on the Viz extension being installed, but the `profile` command is part of the core install. Closes #5613 Unblocks #5545 Signed-off-by: Kevin Leimkuhler <kevin@kleimkuhler.com>
This commit is contained in:
parent
590d152566
commit
df0ce24b12
|
@ -4,7 +4,6 @@ import (
|
|||
"errors"
|
||||
"fmt"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
|
||||
"github.com/linkerd/linkerd2/pkg/healthcheck"
|
||||
|
@ -20,10 +19,7 @@ type profileOptions struct {
|
|||
template bool
|
||||
openAPI string
|
||||
proto string
|
||||
tap string
|
||||
ignoreCluster bool
|
||||
tapDuration time.Duration
|
||||
tapRouteLimit uint
|
||||
}
|
||||
|
||||
func newProfileOptions() *profileOptions {
|
||||
|
@ -32,10 +28,7 @@ func newProfileOptions() *profileOptions {
|
|||
template: false,
|
||||
openAPI: "",
|
||||
proto: "",
|
||||
tap: "",
|
||||
ignoreCluster: false,
|
||||
tapDuration: 5 * time.Second,
|
||||
tapRouteLimit: 20,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -50,17 +43,10 @@ func (options *profileOptions) validate() error {
|
|||
if options.proto != "" {
|
||||
outputs++
|
||||
}
|
||||
if options.tap != "" {
|
||||
outputs++
|
||||
}
|
||||
if outputs != 1 {
|
||||
return errors.New("You must specify exactly one of --template or --open-api or --proto or --tap")
|
||||
return errors.New("You must specify exactly one of --template or --open-api or --proto")
|
||||
}
|
||||
|
||||
// service profile generation based on tap data requires access to k8s cluster
|
||||
if options.ignoreCluster && options.tap != "" {
|
||||
return errors.New("--ignore-cluster and --tap flags are mutually exclusive; SP generation based on tap data requires access-check to k8s cluster")
|
||||
}
|
||||
// a DNS-1035 label must consist of lower case alphanumeric characters or '-',
|
||||
// start with an alphabetic character, and end with an alphanumeric character
|
||||
if errs := validation.IsDNS1035Label(options.name); len(errs) != 0 {
|
||||
|
@ -82,7 +68,7 @@ func newCmdProfile() *cobra.Command {
|
|||
options := newProfileOptions()
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "profile [flags] (--template | --open-api file | --proto file | --tap resource) (SERVICE)",
|
||||
Use: "profile [flags] (--template | --open-api file | --proto file) (SERVICE)",
|
||||
Short: "Output service profile config for Kubernetes",
|
||||
Long: "Output service profile config for Kubernetes.",
|
||||
Example: ` # Output a basic template to apply after modification.
|
||||
|
@ -93,9 +79,6 @@ func newCmdProfile() *cobra.Command {
|
|||
|
||||
# Generate a profile from a protobuf definition.
|
||||
linkerd profile -n emojivoto --proto Voting.proto vote-svc
|
||||
|
||||
# Generate a profile by watching live traffic based off tap data.
|
||||
linkerd profile -n emojivoto web-svc --tap deploy/web --tap-duration 10s --tap-route-limit 5
|
||||
`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
|
@ -104,7 +87,6 @@ func newCmdProfile() *cobra.Command {
|
|||
}
|
||||
options.name = args[0]
|
||||
clusterDomain := defaultClusterDomain
|
||||
var k8sAPI *k8s.KubernetesAPI
|
||||
|
||||
err := options.validate()
|
||||
if err != nil {
|
||||
|
@ -112,10 +94,9 @@ func newCmdProfile() *cobra.Command {
|
|||
}
|
||||
// performs an online profile generation and access-check to k8s cluster to extract
|
||||
// clusterDomain from linkerd configuration
|
||||
// profile generation based on tap data requires access to k8s cluster
|
||||
if !options.ignoreCluster {
|
||||
var err error
|
||||
k8sAPI, err = k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
|
||||
k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -135,8 +116,6 @@ func newCmdProfile() *cobra.Command {
|
|||
return profiles.RenderProfileTemplate(options.namespace, options.name, clusterDomain, os.Stdout)
|
||||
} else if options.openAPI != "" {
|
||||
return profiles.RenderOpenAPI(options.openAPI, options.namespace, options.name, clusterDomain, os.Stdout)
|
||||
} else if options.tap != "" {
|
||||
return profiles.RenderTapOutputProfile(cmd.Context(), k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), os.Stdout)
|
||||
} else if options.proto != "" {
|
||||
return profiles.RenderProto(options.proto, options.namespace, options.name, clusterDomain, os.Stdout)
|
||||
}
|
||||
|
@ -148,9 +127,6 @@ func newCmdProfile() *cobra.Command {
|
|||
|
||||
cmd.PersistentFlags().BoolVar(&options.template, "template", options.template, "Output a service profile template")
|
||||
cmd.PersistentFlags().StringVar(&options.openAPI, "open-api", options.openAPI, "Output a service profile based on the given OpenAPI spec file")
|
||||
cmd.PersistentFlags().StringVar(&options.tap, "tap", options.tap, "Output a service profile based on tap data for the given target resource")
|
||||
cmd.PersistentFlags().DurationVar(&options.tapDuration, "tap-duration", options.tapDuration, "Duration over which tap data is collected (for example: \"10s\", \"1m\", \"10m\")")
|
||||
cmd.PersistentFlags().UintVar(&options.tapRouteLimit, "tap-route-limit", options.tapRouteLimit, "Max number of routes to add to the profile")
|
||||
cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the service")
|
||||
cmd.PersistentFlags().StringVar(&options.proto, "proto", options.proto, "Output a service profile based on the given Protobuf spec file")
|
||||
cmd.PersistentFlags().BoolVar(&options.ignoreCluster, "ignore-cluster", options.ignoreCluster, "Output a service profile through offline generation")
|
||||
|
|
|
@ -35,7 +35,7 @@ func TestParseProfile(t *testing.T) {
|
|||
|
||||
func TestValidateOptions(t *testing.T) {
|
||||
options := newProfileOptions()
|
||||
exp := errors.New("You must specify exactly one of --template or --open-api or --proto or --tap")
|
||||
exp := errors.New("You must specify exactly one of --template or --open-api or --proto")
|
||||
err := options.validate()
|
||||
if err == nil || err.Error() != exp.Error() {
|
||||
t.Fatalf("validateOptions returned unexpected error: %s (expected: %s) for options: %+v", err, exp, options)
|
||||
|
@ -44,7 +44,7 @@ func TestValidateOptions(t *testing.T) {
|
|||
options = newProfileOptions()
|
||||
options.template = true
|
||||
options.openAPI = "openAPI"
|
||||
exp = errors.New("You must specify exactly one of --template or --open-api or --proto or --tap")
|
||||
exp = errors.New("You must specify exactly one of --template or --open-api or --proto")
|
||||
err = options.validate()
|
||||
if err == nil || err.Error() != exp.Error() {
|
||||
t.Fatalf("validateOptions returned unexpected error: %s (expected: %s) for options: %+v", err, exp, options)
|
||||
|
|
|
@ -6,7 +6,6 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"path"
|
||||
"regexp"
|
||||
"sort"
|
||||
|
||||
"github.com/go-openapi/spec"
|
||||
|
@ -20,8 +19,6 @@ const (
|
|||
xLinkerdTimeout = "x-linkerd-timeout"
|
||||
)
|
||||
|
||||
var pathParamRegex = regexp.MustCompile(`\\{[^\}]*\\}`)
|
||||
|
||||
// RenderOpenAPI reads an OpenAPI spec file and renders the corresponding
|
||||
// ServiceProfile to a buffer, given a namespace, service, and control plane
|
||||
// namespace.
|
||||
|
@ -58,7 +55,7 @@ func swaggerToServiceProfile(swagger spec.Swagger, namespace, name, clusterDomai
|
|||
Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
|
||||
Namespace: namespace,
|
||||
},
|
||||
TypeMeta: serviceProfileMeta,
|
||||
TypeMeta: ServiceProfileMeta,
|
||||
}
|
||||
|
||||
routes := make([]*sp.RouteSpec, 0)
|
||||
|
@ -74,33 +71,33 @@ func swaggerToServiceProfile(swagger spec.Swagger, namespace, name, clusterDomai
|
|||
for _, relPath := range paths {
|
||||
item := swagger.Paths.Paths[relPath]
|
||||
path := path.Join(swagger.BasePath, relPath)
|
||||
pathRegex := pathToRegex(path)
|
||||
pathRegex := PathToRegex(path)
|
||||
if item.Delete != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodDelete, item.Delete)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodDelete, item.Delete)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
if item.Get != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodGet, item.Get)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodGet, item.Get)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
if item.Head != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodHead, item.Head)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodHead, item.Head)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
if item.Options != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodOptions, item.Options)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodOptions, item.Options)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
if item.Patch != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodPatch, item.Patch)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodPatch, item.Patch)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
if item.Post != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodPost, item.Post)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodPost, item.Post)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
if item.Put != nil {
|
||||
spec := mkRouteSpec(path, pathRegex, http.MethodPut, item.Put)
|
||||
spec := MkRouteSpec(path, pathRegex, http.MethodPut, item.Put)
|
||||
routes = append(routes, spec)
|
||||
}
|
||||
}
|
||||
|
@ -109,7 +106,8 @@ func swaggerToServiceProfile(swagger spec.Swagger, namespace, name, clusterDomai
|
|||
return profile
|
||||
}
|
||||
|
||||
func mkRouteSpec(path, pathRegex string, method string, operation *spec.Operation) *sp.RouteSpec {
|
||||
// MkRouteSpec makes a service profile route from an OpenAPI operation.
|
||||
func MkRouteSpec(path, pathRegex string, method string, operation *spec.Operation) *sp.RouteSpec {
|
||||
retryable := false
|
||||
timeout := ""
|
||||
var responses *spec.Responses
|
||||
|
@ -127,11 +125,6 @@ func mkRouteSpec(path, pathRegex string, method string, operation *spec.Operatio
|
|||
}
|
||||
}
|
||||
|
||||
func pathToRegex(path string) string {
|
||||
escaped := regexp.QuoteMeta(path)
|
||||
return pathParamRegex.ReplaceAllLiteralString(escaped, "[^/]*")
|
||||
}
|
||||
|
||||
func toReqMatch(path string, method string) *sp.RequestMatch {
|
||||
return &sp.RequestMatch{
|
||||
PathRegex: path,
|
||||
|
|
|
@ -41,7 +41,7 @@ func TestSwaggerToServiceProfile(t *testing.T) {
|
|||
}
|
||||
|
||||
expectedServiceProfile := sp.ServiceProfile{
|
||||
TypeMeta: serviceProfileMeta,
|
||||
TypeMeta: ServiceProfileMeta,
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name + "." + namespace + ".svc." + clusterDomain,
|
||||
Namespace: namespace,
|
||||
|
|
|
@ -6,6 +6,7 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"regexp"
|
||||
"text/template"
|
||||
"time"
|
||||
|
||||
|
@ -16,6 +17,8 @@ import (
|
|||
"sigs.k8s.io/yaml"
|
||||
)
|
||||
|
||||
var pathParamRegex = regexp.MustCompile(`\\{[^\}]*\\}`)
|
||||
|
||||
type profileTemplateConfig struct {
|
||||
ServiceNamespace string
|
||||
ServiceName string
|
||||
|
@ -23,8 +26,8 @@ type profileTemplateConfig struct {
|
|||
}
|
||||
|
||||
var (
|
||||
// serviceProfileMeta is the TypeMeta for the ServiceProfile custom resource.
|
||||
serviceProfileMeta = metav1.TypeMeta{
|
||||
// ServiceProfileMeta is the TypeMeta for the ServiceProfile custom resource.
|
||||
ServiceProfileMeta = metav1.TypeMeta{
|
||||
APIVersion: k8s.ServiceProfileAPIVersion,
|
||||
Kind: k8s.ServiceProfileKind,
|
||||
}
|
||||
|
@ -241,3 +244,9 @@ func writeProfile(profile sp.ServiceProfile, w io.Writer) error {
|
|||
_, err = w.Write(output)
|
||||
return err
|
||||
}
|
||||
|
||||
// PathToRegex converts a path into a regex.
|
||||
func PathToRegex(path string) string {
|
||||
escaped := regexp.QuoteMeta(path)
|
||||
return pathParamRegex.ReplaceAllLiteralString(escaped, "[^/]*")
|
||||
}
|
||||
|
|
|
@ -71,7 +71,7 @@ func protoToServiceProfile(parser *proto.Parser, namespace, name, clusterDomain
|
|||
Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
|
||||
Namespace: namespace,
|
||||
},
|
||||
TypeMeta: serviceProfileMeta,
|
||||
TypeMeta: ServiceProfileMeta,
|
||||
Spec: sp.ServiceProfileSpec{
|
||||
Routes: routes,
|
||||
},
|
||||
|
|
|
@ -33,7 +33,7 @@ service VotingService {
|
|||
parser := proto.NewParser(strings.NewReader(protobuf))
|
||||
|
||||
expectedServiceProfile := sp.ServiceProfile{
|
||||
TypeMeta: serviceProfileMeta,
|
||||
TypeMeta: ServiceProfileMeta,
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name + "." + namespace + ".svc." + clusterDomain,
|
||||
Namespace: namespace,
|
||||
|
|
|
@ -1,143 +0,0 @@
|
|||
package profiles
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
"github.com/linkerd/linkerd2/controller/api/util"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/protohttp"
|
||||
"github.com/linkerd/linkerd2/pkg/tap"
|
||||
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
|
||||
log "github.com/sirupsen/logrus"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
)
|
||||
|
||||
// RenderTapOutputProfile performs a tap on the desired resource and generates
|
||||
// a service profile with routes pre-populated from the tap data
|
||||
// Only inbound tap traffic is considered.
|
||||
func RenderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
|
||||
requestParams := util.TapRequestParams{
|
||||
Resource: tapResource,
|
||||
Namespace: namespace,
|
||||
}
|
||||
log.Debugf("Running `linkerd tap %s --namespace %s`", tapResource, namespace)
|
||||
|
||||
req, err := util.BuildTapByResourceRequest(requestParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
profile, err := tapToServiceProfile(ctx, k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
output, err := yaml.Marshal(profile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error writing Service Profile: %s", err)
|
||||
}
|
||||
w.Write(output)
|
||||
return nil
|
||||
}
|
||||
|
||||
func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
|
||||
profile := sp.ServiceProfile{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
|
||||
Namespace: namespace,
|
||||
},
|
||||
TypeMeta: serviceProfileMeta,
|
||||
}
|
||||
|
||||
ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration)
|
||||
defer cancel()
|
||||
reader, body, err := tap.Reader(ctxWithTime, k8sAPI, tapReq)
|
||||
if err != nil {
|
||||
return profile, err
|
||||
}
|
||||
defer body.Close()
|
||||
|
||||
routes := routeSpecFromTap(reader, routeLimit)
|
||||
|
||||
profile.Spec.Routes = routes
|
||||
|
||||
return profile, nil
|
||||
}
|
||||
|
||||
func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSpec {
|
||||
routes := make([]*sp.RouteSpec, 0)
|
||||
routesMap := make(map[string]*sp.RouteSpec)
|
||||
|
||||
for {
|
||||
log.Debug("Waiting for data...")
|
||||
event := pb.TapEvent{}
|
||||
err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
|
||||
if err != nil {
|
||||
// expected errors when hitting the tapDuration deadline
|
||||
var e net.Error
|
||||
if err != io.EOF &&
|
||||
!(errors.As(err, &e) && e.Timeout()) &&
|
||||
!errors.Is(err, context.DeadlineExceeded) &&
|
||||
!strings.HasSuffix(err.Error(), tap.ErrClosedResponseBody) {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
}
|
||||
break
|
||||
}
|
||||
|
||||
routeSpec := getPathDataFromTap(&event)
|
||||
log.Debugf("Created route spec: %v", routeSpec)
|
||||
|
||||
if routeSpec != nil {
|
||||
routesMap[routeSpec.Name] = routeSpec
|
||||
if len(routesMap) >= routeLimit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, path := range sortMapKeys(routesMap) {
|
||||
routes = append(routes, routesMap[path])
|
||||
}
|
||||
return routes
|
||||
}
|
||||
|
||||
func sortMapKeys(m map[string]*sp.RouteSpec) (keys []string) {
|
||||
for key := range m {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return
|
||||
}
|
||||
|
||||
func getPathDataFromTap(event *pb.TapEvent) *sp.RouteSpec {
|
||||
if event.GetProxyDirection() != pb.TapEvent_INBOUND {
|
||||
return nil
|
||||
}
|
||||
|
||||
switch ev := event.GetHttp().GetEvent().(type) {
|
||||
case *pb.TapEvent_Http_RequestInit_:
|
||||
path := ev.RequestInit.GetPath()
|
||||
if path == "/" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return mkRouteSpec(
|
||||
path,
|
||||
pathToRegex(path), // for now, no path consolidation
|
||||
ev.RequestInit.GetMethod().GetRegistered().String(),
|
||||
nil)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
|
@ -12,7 +12,7 @@ import (
|
|||
// GenServiceProfile generates a mock ServiceProfile.
|
||||
func GenServiceProfile(service, namespace, clusterDomain string) v1alpha2.ServiceProfile {
|
||||
return v1alpha2.ServiceProfile{
|
||||
TypeMeta: serviceProfileMeta,
|
||||
TypeMeta: ServiceProfileMeta,
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: service + "." + namespace + ".svc." + clusterDomain,
|
||||
Namespace: namespace,
|
||||
|
|
|
@ -101,6 +101,7 @@ func testProfiles(t *testing.T) {
|
|||
sourceFlag := fmt.Sprintf("--%s", tc.sourceName)
|
||||
cmd := []string{"profile", "--namespace", tc.namespace, tc.spName, sourceFlag}
|
||||
if tc.sourceName == "tap" {
|
||||
cmd = append([]string{"viz"}, cmd...)
|
||||
tc.args = []string{
|
||||
tc.deployName,
|
||||
"--tap-route-limit",
|
||||
|
|
|
@ -0,0 +1,222 @@
|
|||
package cmd
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"sort"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/ghodss/yaml"
|
||||
"github.com/linkerd/linkerd2/controller/api/util"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
|
||||
"github.com/linkerd/linkerd2/pkg/healthcheck"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/profiles"
|
||||
"github.com/linkerd/linkerd2/pkg/protohttp"
|
||||
"github.com/linkerd/linkerd2/pkg/tap"
|
||||
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
|
||||
"github.com/linkerd/linkerd2/viz/pkg/api"
|
||||
log "github.com/sirupsen/logrus"
|
||||
"github.com/spf13/cobra"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/validation"
|
||||
)
|
||||
|
||||
type profileOptions struct {
|
||||
name string
|
||||
namespace string
|
||||
tap string
|
||||
tapDuration time.Duration
|
||||
tapRouteLimit uint
|
||||
}
|
||||
|
||||
func newProfileOptions() *profileOptions {
|
||||
return &profileOptions{
|
||||
tapDuration: 5 * time.Second,
|
||||
tapRouteLimit: 20,
|
||||
}
|
||||
}
|
||||
|
||||
func (options *profileOptions) validate() error {
|
||||
if options.tap == "" {
|
||||
return errors.New("The --tap flag must be specified")
|
||||
}
|
||||
// a DNS-1035 label must consist of lower case alphanumeric characters or '-',
|
||||
// start with an alphabetic character, and end with an alphanumeric character
|
||||
if errs := validation.IsDNS1035Label(options.name); len(errs) != 0 {
|
||||
return fmt.Errorf("invalid service %q: %v", options.name, errs)
|
||||
}
|
||||
// a DNS-1123 label must consist of lower case alphanumeric characters or '-',
|
||||
// and must start and end with an alphanumeric character
|
||||
if errs := validation.IsDNS1123Label(options.namespace); len(errs) != 0 {
|
||||
return fmt.Errorf("invalid namespace %q: %v", options.namespace, errs)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// newCmdProfile creates a new cobra command for the Profile subcommand which
|
||||
// generates Linkerd service profile based off tap data.
|
||||
func newCmdProfile() *cobra.Command {
|
||||
options := newProfileOptions()
|
||||
|
||||
cmd := &cobra.Command{
|
||||
Use: "profile [flags] --tap resource (SERVICE)",
|
||||
Short: "Output service profile config for Kubernetes based off tap data",
|
||||
Long: "Output service profile config for Kubernetes based off tap data.",
|
||||
Example: ` # Generate a profile by watching live traffic.
|
||||
linkerd viz profile -n emojivoto web-svc --tap deploy/web --tap-duration 10s --tap-route-limit 5
|
||||
`,
|
||||
Args: cobra.ExactArgs(1),
|
||||
RunE: func(cmd *cobra.Command, args []string) error {
|
||||
api.CheckClientOrExit(healthcheck.Options{
|
||||
ControlPlaneNamespace: controlPlaneNamespace,
|
||||
KubeConfig: kubeconfigPath,
|
||||
Impersonate: impersonate,
|
||||
ImpersonateGroup: impersonateGroup,
|
||||
KubeContext: kubeContext,
|
||||
APIAddr: apiAddr,
|
||||
})
|
||||
if options.namespace == "" {
|
||||
options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
|
||||
}
|
||||
options.name = args[0]
|
||||
clusterDomain := "cluster.local"
|
||||
var k8sAPI *k8s.KubernetesAPI
|
||||
err := options.validate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
k8sAPI, err = k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
_, values, err := healthcheck.FetchCurrentConfiguration(cmd.Context(), k8sAPI, controlPlaneNamespace)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if cd := values.GetGlobal().ClusterDomain; cd != "" {
|
||||
clusterDomain = cd
|
||||
}
|
||||
return renderTapOutputProfile(cmd.Context(), k8sAPI, options.tap, options.namespace, options.name, clusterDomain, options.tapDuration, int(options.tapRouteLimit), os.Stdout)
|
||||
},
|
||||
}
|
||||
cmd.PersistentFlags().StringVar(&options.tap, "tap", options.tap, "Output a service profile based on tap data for the given target resource")
|
||||
cmd.PersistentFlags().DurationVar(&options.tapDuration, "tap-duration", options.tapDuration, "Duration over which tap data is collected (for example: \"10s\", \"1m\", \"10m\")")
|
||||
cmd.PersistentFlags().UintVar(&options.tapRouteLimit, "tap-route-limit", options.tapRouteLimit, "Max number of routes to add to the profile")
|
||||
cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the service")
|
||||
return cmd
|
||||
}
|
||||
|
||||
// renderTapOutputProfile performs a tap on the desired resource and generates
|
||||
// a service profile with routes pre-populated from the tap data
|
||||
// Only inbound tap traffic is considered.
|
||||
func renderTapOutputProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapResource, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int, w io.Writer) error {
|
||||
requestParams := util.TapRequestParams{
|
||||
Resource: tapResource,
|
||||
Namespace: namespace,
|
||||
}
|
||||
log.Debugf("Running `linkerd tap %s --namespace %s`", tapResource, namespace)
|
||||
req, err := util.BuildTapByResourceRequest(requestParams)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
profile, err := tapToServiceProfile(ctx, k8sAPI, req, namespace, name, clusterDomain, tapDuration, routeLimit)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
output, err := yaml.Marshal(profile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Error writing Service Profile: %s", err)
|
||||
}
|
||||
w.Write(output)
|
||||
return nil
|
||||
}
|
||||
|
||||
func tapToServiceProfile(ctx context.Context, k8sAPI *k8s.KubernetesAPI, tapReq *pb.TapByResourceRequest, namespace, name, clusterDomain string, tapDuration time.Duration, routeLimit int) (sp.ServiceProfile, error) {
|
||||
profile := sp.ServiceProfile{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: fmt.Sprintf("%s.%s.svc.%s", name, namespace, clusterDomain),
|
||||
Namespace: namespace,
|
||||
},
|
||||
TypeMeta: profiles.ServiceProfileMeta,
|
||||
}
|
||||
ctxWithTime, cancel := context.WithTimeout(ctx, tapDuration)
|
||||
defer cancel()
|
||||
reader, body, err := tap.Reader(ctxWithTime, k8sAPI, tapReq)
|
||||
if err != nil {
|
||||
return profile, err
|
||||
}
|
||||
defer body.Close()
|
||||
routes := routeSpecFromTap(reader, routeLimit)
|
||||
profile.Spec.Routes = routes
|
||||
return profile, nil
|
||||
}
|
||||
|
||||
func routeSpecFromTap(tapByteStream *bufio.Reader, routeLimit int) []*sp.RouteSpec {
|
||||
routes := make([]*sp.RouteSpec, 0)
|
||||
routesMap := make(map[string]*sp.RouteSpec)
|
||||
for {
|
||||
log.Debug("Waiting for data...")
|
||||
event := pb.TapEvent{}
|
||||
err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
|
||||
if err != nil {
|
||||
// expected errors when hitting the tapDuration deadline
|
||||
var e net.Error
|
||||
if err != io.EOF &&
|
||||
!(errors.As(err, &e) && e.Timeout()) &&
|
||||
!errors.Is(err, context.DeadlineExceeded) &&
|
||||
!strings.HasSuffix(err.Error(), tap.ErrClosedResponseBody) {
|
||||
fmt.Fprintln(os.Stderr, err)
|
||||
}
|
||||
break
|
||||
}
|
||||
routeSpec := getPathDataFromTap(&event)
|
||||
log.Debugf("Created route spec: %v", routeSpec)
|
||||
if routeSpec != nil {
|
||||
routesMap[routeSpec.Name] = routeSpec
|
||||
if len(routesMap) >= routeLimit {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, path := range sortMapKeys(routesMap) {
|
||||
routes = append(routes, routesMap[path])
|
||||
}
|
||||
return routes
|
||||
}
|
||||
|
||||
func sortMapKeys(m map[string]*sp.RouteSpec) (keys []string) {
|
||||
for key := range m {
|
||||
keys = append(keys, key)
|
||||
}
|
||||
sort.Strings(keys)
|
||||
return
|
||||
}
|
||||
|
||||
func getPathDataFromTap(event *pb.TapEvent) *sp.RouteSpec {
|
||||
if event.GetProxyDirection() != pb.TapEvent_INBOUND {
|
||||
return nil
|
||||
}
|
||||
switch ev := event.GetHttp().GetEvent().(type) {
|
||||
case *pb.TapEvent_Http_RequestInit_:
|
||||
path := ev.RequestInit.GetPath()
|
||||
if path == "/" {
|
||||
return nil
|
||||
}
|
||||
|
||||
return profiles.MkRouteSpec(
|
||||
path,
|
||||
profiles.PathToRegex(path), // for now, no path consolidation
|
||||
ev.RequestInit.GetMethod().GetRegistered().String(),
|
||||
nil)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
package profiles
|
||||
package cmd
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
@ -10,6 +10,7 @@ import (
|
|||
"github.com/linkerd/linkerd2/controller/api/util"
|
||||
sp "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
|
||||
"github.com/linkerd/linkerd2/pkg/k8s"
|
||||
"github.com/linkerd/linkerd2/pkg/profiles"
|
||||
"github.com/linkerd/linkerd2/pkg/protohttp"
|
||||
pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
@ -95,7 +96,7 @@ func TestTapToServiceProfile(t *testing.T) {
|
|||
kubeAPI.Config.Host = ts.URL
|
||||
|
||||
expectedServiceProfile := sp.ServiceProfile{
|
||||
TypeMeta: serviceProfileMeta,
|
||||
TypeMeta: profiles.ServiceProfileMeta,
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: name + "." + namespace + ".svc." + clusterDomain,
|
||||
Namespace: namespace,
|
||||
|
@ -125,7 +126,7 @@ func TestTapToServiceProfile(t *testing.T) {
|
|||
t.Fatalf("Failed to create ServiceProfile: %v", err)
|
||||
}
|
||||
|
||||
err = ServiceProfileYamlEquals(actualServiceProfile, expectedServiceProfile)
|
||||
err = profiles.ServiceProfileYamlEquals(actualServiceProfile, expectedServiceProfile)
|
||||
if err != nil {
|
||||
t.Fatalf("ServiceProfiles are not equal: %v", err)
|
||||
}
|
|
@ -79,6 +79,7 @@ func NewCmdViz() *cobra.Command {
|
|||
vizCmd.AddCommand(NewCmdDashboard())
|
||||
vizCmd.AddCommand(newCmdUninstall())
|
||||
vizCmd.AddCommand(newCmdCheck())
|
||||
vizCmd.AddCommand(newCmdProfile())
|
||||
|
||||
return vizCmd
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue