Have `linkerd endpoints` use `Destination.Get` (#2990)

* Have `linkerd endpoints` use `Destination.Get`

Fixes #2885

We're refactoring `linkerd endpoints` so it hits
directly the `Destination.Get` endpoint, instead of relying on the
Discovery service.

For that, I've created a new `client.go` for Destination and added it to
the `APIClient` interface.

I've also added a `destinationClient` struct that mimics `tapClient`,
and whose common logic has been moved into `stream_client.go`.

Analogously, I added a `destinationServer` struct that mimics
`tapServer`.

Signed-off-by: Alejandro Pedraza <alejandro@buoyant.io>
This commit is contained in:
Alejandro Pedraza 2019-07-03 09:11:03 -05:00 committed by GitHub
parent 944f58fb72
commit 53e589890d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 608 additions and 286 deletions

View File

@ -3,25 +3,37 @@ package cmd
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"net"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
netPb "github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/public"
pb "github.com/linkerd/linkerd2/controller/gen/controller/discovery"
"github.com/linkerd/linkerd2/pkg/addr"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
)
type endpointsOptions struct {
namespace string
outputFormat string
}
type (
// map[ServiceID]map[Port][]podData
endpointsInfo map[string]map[uint32][]podData
podData struct {
name string
address string
ip string
}
)
const (
podHeader = "POD"
)
@ -38,7 +50,6 @@ func (o *endpointsOptions) validate() error {
func newEndpointsOptions() *endpointsOptions {
return &endpointsOptions{
namespace: "",
outputFormat: tableOutput,
}
}
@ -46,37 +57,37 @@ func newEndpointsOptions() *endpointsOptions {
func newCmdEndpoints() *cobra.Command {
options := newEndpointsOptions()
example := ` # get all endpoints
linkerd endpoints
example := ` # get all endpoints for the authorities emoji-svc.emojivoto.svc.cluster.local:8080 and web-svc.emojivoto.svc.cluster.local:80
linkerd endpoints emoji-svc.emojivoto.svc.cluster.local:8080 web-svc.emojivoto.svc.cluster.local:80
# get endpoints in the emojivoto namespace
linkerd endpoints -n emojivoto
# get that same information in json format
linkerd endpoints -o json emoji-svc.emojivoto.svc.cluster.local:8080 web-svc.emojivoto.svc.cluster.local:80
# get all endpoints in json
linkerd endpoints -o json`
# get the endpoints for authorities in Linkerd's control-plane itself
linkerd endpoints linkerd-controller-api.linkerd.svc.cluster.local:8085
linkerd endpoints linkerd-web.linkerd.svc.cluster.local:8084`
cmd := &cobra.Command{
Use: "endpoints [flags]",
Use: "endpoints [flags] authorities",
Aliases: []string{"ep"},
Short: "Introspect Linkerd's service discovery state",
Long: `Introspect Linkerd's service discovery state.
This command provides debug information about the internal state of the
control-plane's destination container. Note that this cache of service discovery
information is populated on-demand via linkerd-proxy requests. This command
will return "No endpoints found." until a linkerd-proxy begins routing
requests.`,
control-plane's destination container. It queries the same Destination service
endpoint as the linkerd-proxy's, and returns the addresses associated with that
destination.`,
Example: example,
Args: cobra.NoArgs,
Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
err := options.validate()
if err != nil {
return err
}
endpoints, err := requestEndpointsFromAPI(checkPublicAPIClientOrExit())
endpoints, err := requestEndpointsFromAPI(checkPublicAPIClientOrExit(), args)
if err != nil {
return fmt.Errorf("Endpoints API error: %s", err)
return fmt.Errorf("Destination API error: %s", err)
}
output := renderEndpoints(endpoints, options)
@ -86,17 +97,88 @@ requests.`,
},
}
cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace, "Namespace of the specified endpoints (default: all namespaces)")
cmd.PersistentFlags().StringVarP(&options.outputFormat, "output", "o", options.outputFormat, fmt.Sprintf("Output format; one of: \"%s\" or \"%s\"", tableOutput, jsonOutput))
return cmd
}
func requestEndpointsFromAPI(client public.APIClient) (*pb.EndpointsResponse, error) {
return client.Endpoints(context.Background(), &pb.EndpointsParams{})
func requestEndpointsFromAPI(client public.APIClient, authorities []string) (endpointsInfo, error) {
info := make(endpointsInfo)
// buffered channels to avoid blocking
events := make(chan *destinationPb.Update, len(authorities))
errors := make(chan error, len(authorities))
var wg sync.WaitGroup
for _, authority := range authorities {
wg.Add(1)
go func(authority string) {
defer wg.Done()
if len(errors) == 0 {
dest := &destinationPb.GetDestination{
Scheme: "http:",
Path: authority,
}
rsp, err := client.Get(context.Background(), dest)
if err != nil {
errors <- err
return
}
event, err := rsp.Recv()
if err != nil {
errors <- err
return
}
events <- event
}
}(authority)
}
// Block till all goroutines above are done
wg.Wait()
for i := 0; i < len(authorities); i++ {
select {
case err := <-errors:
// we only care about the first error
return nil, err
case event := <-events:
addressSet := event.GetAdd()
labels := addressSet.GetMetricLabels()
serviceID := labels["service"] + "." + labels["namespace"]
if _, ok := info[serviceID]; !ok {
info[serviceID] = make(map[uint32][]podData)
}
for _, addr := range addressSet.GetAddrs() {
tcpAddr := addr.GetAddr()
port := tcpAddr.GetPort()
if info[serviceID][port] == nil {
info[serviceID][port] = make([]podData, 0)
}
labels := addr.GetMetricLabels()
info[serviceID][port] = append(info[serviceID][port], podData{
name: labels["pod"],
address: tcpAddr.String(),
ip: getIP(tcpAddr),
})
}
}
}
return info, nil
}
func renderEndpoints(endpoints *pb.EndpointsResponse, options *endpointsOptions) string {
func getIP(tcpAddr *netPb.TcpAddress) string {
ip := tcpAddr.GetIp().GetIpv4()
b := make([]byte, 4)
binary.BigEndian.PutUint32(b, ip)
return net.IP(b).String()
}
func renderEndpoints(endpoints endpointsInfo, options *endpointsOptions) string {
var buffer bytes.Buffer
w := tabwriter.NewWriter(&buffer, 0, 0, padding, ' ', 0)
writeEndpointsToBuffer(endpoints, w, options)
@ -110,40 +192,31 @@ type rowEndpoint struct {
IP string `json:"ip"`
Port uint32 `json:"port"`
Pod string `json:"pod"`
Version string `json:"version"`
Service string `json:"service"`
}
func writeEndpointsToBuffer(endpoints *pb.EndpointsResponse, w *tabwriter.Writer, options *endpointsOptions) {
func writeEndpointsToBuffer(endpoints endpointsInfo, w *tabwriter.Writer, options *endpointsOptions) {
maxPodLength := len(podHeader)
maxNamespaceLength := len(namespaceHeader)
endpointsTables := map[string][]rowEndpoint{}
for serviceID, servicePort := range endpoints.GetServicePorts() {
for serviceID, servicePort := range endpoints {
namespace := ""
parts := strings.SplitN(serviceID, ".", 2)
if len(parts) == 2 {
namespace = parts[1]
}
namespace = parts[1]
if options.namespace != "" && options.namespace != namespace {
continue
}
for port, podAddrs := range servicePort.GetPortEndpoints() {
for _, podAddr := range podAddrs.GetPodAddresses() {
pod := podAddr.GetPod()
name := pod.GetName()
for port, podAddrs := range servicePort {
for _, pod := range podAddrs {
name := pod.name
parts := strings.SplitN(name, "/", 2)
if len(parts) == 2 {
name = parts[1]
}
row := rowEndpoint{
Namespace: namespace,
IP: addr.PublicIPToString(podAddr.GetAddr().GetIp()),
IP: pod.ip,
Port: port,
Pod: name,
Version: pod.GetResourceVersion(),
Service: serviceID,
}
@ -169,13 +242,13 @@ func writeEndpointsToBuffer(endpoints *pb.EndpointsResponse, w *tabwriter.Writer
fmt.Fprintln(os.Stderr, "No endpoints found.")
os.Exit(0)
}
printEndpointsTables(endpointsTables, w, options, maxPodLength, maxNamespaceLength)
printEndpointsTables(endpointsTables, w, maxPodLength, maxNamespaceLength)
case jsonOutput:
printEndpointsJSON(endpointsTables, w)
}
}
func printEndpointsTables(endpointsTables map[string][]rowEndpoint, w *tabwriter.Writer, options *endpointsOptions, maxPodLength int, maxNamespaceLength int) {
func printEndpointsTables(endpointsTables map[string][]rowEndpoint, w *tabwriter.Writer, maxPodLength int, maxNamespaceLength int) {
firstTable := true // don't print a newline before the first table
for _, ns := range sortNamespaceKeys(endpointsTables) {
@ -183,42 +256,33 @@ func printEndpointsTables(endpointsTables map[string][]rowEndpoint, w *tabwriter
fmt.Fprint(w, "\n")
}
firstTable = false
printEndpointsTable(ns, endpointsTables[ns], w, options, maxPodLength, maxNamespaceLength)
printEndpointsTable(ns, endpointsTables[ns], w, maxPodLength, maxNamespaceLength)
}
}
func printEndpointsTable(namespace string, rows []rowEndpoint, w *tabwriter.Writer, options *endpointsOptions, maxPodLength int, maxNamespaceLength int) {
func printEndpointsTable(namespace string, rows []rowEndpoint, w *tabwriter.Writer, maxPodLength int, maxNamespaceLength int) {
headers := make([]string, 0)
templateString := "%s\t%d\t%s\t%s\t%s\n"
templateString := "%s\t%d\t%s\t%s\n"
if options.namespace == "" {
headers = append(headers, namespaceHeader+strings.Repeat(" ", maxNamespaceLength-len(namespaceHeader)))
templateString = "%s\t" + templateString
}
headers = append(headers, namespaceHeader+strings.Repeat(" ", maxNamespaceLength-len(namespaceHeader)))
templateString = "%s\t" + templateString
headers = append(headers, []string{
"IP",
"PORT",
podHeader + strings.Repeat(" ", maxPodLength-len(podHeader)),
"VERSION",
"SERVICE",
}...)
fmt.Fprintln(w, strings.Join(headers, "\t"))
for _, row := range rows {
values := make([]interface{}, 0)
if options.namespace == "" {
values = append(values,
namespace+strings.Repeat(" ", maxNamespaceLength-len(namespace)))
}
values = append(values, []interface{}{
values := []interface{}{
namespace + strings.Repeat(" ", maxNamespaceLength-len(namespace)),
row.IP,
row.Port,
row.Pod,
row.Version,
row.Service,
}...)
}
fmt.Fprintf(w, templateString, values...)
}

View File

@ -3,63 +3,132 @@ package cmd
import (
"testing"
"github.com/linkerd/linkerd2/controller/api/discovery"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/public"
)
type endpointsExp struct {
options *endpointsOptions
identities []string
file string
options *endpointsOptions
authorities []string
endpoints []public.AuthorityEndpoints
file string
}
func TestEndpoints(t *testing.T) {
options := newEndpointsOptions()
options.namespace = "emojivoto"
t.Run("Returns endpoints", func(t *testing.T) {
t.Run("Returns endpoints same namespace", func(t *testing.T) {
testEndpointsCall(endpointsExp{
options: options,
identities: []string{"emoji-svc.emojivoto", "voting-svc.emojivoto", "authors.books"},
file: "endpoints_one_output.golden",
options: options,
authorities: []string{"emoji-svc.emojivoto.svc.cluster.local:8080", "voting-svc.emojivoto.svc.cluster.local:8080"},
endpoints: []public.AuthorityEndpoints{
{
Namespace: "emojivoto",
ServiceID: "emoji-svc",
Pods: []public.PodDetails{
{
Name: "emoji-6bf9f47bd5-jjcrl",
IP: 16909060,
Port: 8080,
},
},
},
{
Namespace: "emojivoto",
ServiceID: "voting-svc",
Pods: []public.PodDetails{
{
Name: "voting-7bf9f47bd5-jjdrl",
IP: 84281096,
Port: 8080,
},
},
},
},
file: "endpoints_one_output.golden",
}, t)
})
t.Run("Returns endpoints different namespace", func(t *testing.T) {
testEndpointsCall(endpointsExp{
options: options,
authorities: []string{"emoji-svc.emojivoto.svc.cluster.local:8080", "voting-svc.emojivoto2.svc.cluster.local:8080"},
endpoints: []public.AuthorityEndpoints{
{
Namespace: "emojivoto",
ServiceID: "emoji-svc",
Pods: []public.PodDetails{
{
Name: "emoji-6bf9f47bd5-jjcrl",
IP: 16909060,
Port: 8080,
},
},
},
{
Namespace: "emojivoto2",
ServiceID: "voting-svc",
Pods: []public.PodDetails{
{
Name: "voting-7bf9f47bd5-jjdrl",
IP: 84281096,
Port: 8080,
},
},
},
},
file: "endpoints_two_outputs.golden",
}, t)
})
options.outputFormat = jsonOutput
t.Run("Returns endpoints (json)", func(t *testing.T) {
t.Run("Returns endpoints same namespace (json)", func(t *testing.T) {
testEndpointsCall(endpointsExp{
options: options,
identities: []string{"emoji-svc.emojivoto", "voting-svc.emojivoto", "authors.books"},
file: "endpoints_one_output_json.golden",
}, t)
})
options = newEndpointsOptions()
t.Run("Returns all namespace endpoints", func(t *testing.T) {
testEndpointsCall(endpointsExp{
options: options,
identities: []string{"emoji-svc.emojivoto", "voting-svc.emojivoto", "authors.books"},
file: "endpoints_all_output.golden",
}, t)
})
options.outputFormat = jsonOutput
t.Run("Returns all namespace endpoints (json)", func(t *testing.T) {
testEndpointsCall(endpointsExp{
options: options,
identities: []string{"emoji-svc.emojivoto", "voting-svc.emojivoto", "authors.books"},
file: "endpoints_all_output_json.golden",
options: options,
authorities: []string{"emoji-svc.emojivoto.svc.cluster.local:8080", "voting-svc.emojivoto.svc.cluster.local:8080"},
endpoints: []public.AuthorityEndpoints{
{
Namespace: "emojivoto",
ServiceID: "emoji-svc",
Pods: []public.PodDetails{
{
Name: "emoji-6bf9f47bd5-jjcrl",
IP: 16909060,
Port: 8080,
},
},
},
{
Namespace: "emojivoto",
ServiceID: "voting-svc",
Pods: []public.PodDetails{
{
Name: "voting-7bf9f47bd5-jjdrl",
IP: 84281096,
Port: 8080,
},
},
},
},
file: "endpoints_one_output_json.golden",
}, t)
})
}
func testEndpointsCall(exp endpointsExp, t *testing.T) {
updates := make([]pb.Update, 0)
for _, endpoint := range exp.endpoints {
addrSet := public.BuildAddrSet(endpoint)
updates = append(updates, pb.Update{Update: &pb.Update_Add{Add: addrSet}})
}
mockClient := &public.MockAPIClient{
MockDiscoveryClient: &discovery.MockDiscoveryClient{
EndpointsResponseToReturn: discovery.GenEndpointsResponse(exp.identities),
DestinationGetClientToReturn: &public.MockDestinationGetClient{
UpdatesToReturn: updates,
},
}
endpoints, err := requestEndpointsFromAPI(mockClient)
endpoints, err := requestEndpointsFromAPI(mockClient, exp.authorities)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@ -1,6 +0,0 @@
NAMESPACE IP PORT POD VERSION SERVICE
books 1.2.3.4 8080 authors 1234 authors.books
NAMESPACE IP PORT POD VERSION SERVICE
emojivoto 1.2.3.4 8080 emoji-svc 1234 emoji-svc.emojivoto
emojivoto 1.2.3.4 8080 voting-svc 1234 voting-svc.emojivoto

View File

@ -1,26 +0,0 @@
[
{
"namespace": "books",
"ip": "1.2.3.4",
"port": 8080,
"pod": "authors",
"version": "1234",
"service": "authors.books"
},
{
"namespace": "emojivoto",
"ip": "1.2.3.4",
"port": 8080,
"pod": "emoji-svc",
"version": "1234",
"service": "emoji-svc.emojivoto"
},
{
"namespace": "emojivoto",
"ip": "1.2.3.4",
"port": 8080,
"pod": "voting-svc",
"version": "1234",
"service": "voting-svc.emojivoto"
}
]

View File

@ -1,3 +1,3 @@
IP PORT POD VERSION SERVICE
1.2.3.4 8080 emoji-svc 1234 emoji-svc.emojivoto
1.2.3.4 8080 voting-svc 1234 voting-svc.emojivoto
NAMESPACE IP PORT POD SERVICE
emojivoto 1.2.3.4 8080 emoji-6bf9f47bd5-jjcrl emoji-svc.emojivoto
emojivoto 5.6.7.8 8080 voting-7bf9f47bd5-jjdrl voting-svc.emojivoto

View File

@ -3,16 +3,14 @@
"namespace": "emojivoto",
"ip": "1.2.3.4",
"port": 8080,
"pod": "emoji-svc",
"version": "1234",
"pod": "emoji-6bf9f47bd5-jjcrl",
"service": "emoji-svc.emojivoto"
},
{
"namespace": "emojivoto",
"ip": "1.2.3.4",
"ip": "5.6.7.8",
"port": 8080,
"pod": "voting-svc",
"version": "1234",
"pod": "voting-7bf9f47bd5-jjdrl",
"service": "voting-svc.emojivoto"
}
]

View File

@ -0,0 +1,5 @@
NAMESPACE IP PORT POD SERVICE
emojivoto 1.2.3.4 8080 emoji-6bf9f47bd5-jjcrl emoji-svc.emojivoto
NAMESPACE IP PORT POD SERVICE
emojivoto2 5.6.7.8 8080 voting-7bf9f47bd5-jjdrl voting-svc.emojivoto2

View File

@ -0,0 +1,17 @@
package destination
import (
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"google.golang.org/grpc"
)
// NewClient creates a client for the control plane Destination API that
// implements the Destination service.
func NewClient(addr string) (pb.DestinationClient, *grpc.ClientConn, error) {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
return pb.NewDestinationClient(conn), conn, nil
}

View File

@ -4,11 +4,13 @@ import (
"bufio"
"bytes"
"context"
"errors"
"fmt"
"net/http"
"net/url"
"github.com/golang/protobuf/proto"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
configPb "github.com/linkerd/linkerd2/controller/gen/config"
discoveryPb "github.com/linkerd/linkerd2/controller/gen/controller/discovery"
@ -17,7 +19,6 @@ import (
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
)
@ -37,6 +38,7 @@ const (
type APIClient interface {
pb.ApiClient
discoveryPb.DiscoveryClient
destinationPb.DestinationClient
}
type grpcOverHTTPClient struct {
@ -104,18 +106,32 @@ func (c *grpcOverHTTPClient) TapByResource(ctx context.Context, req *pb.TapByRes
return nil, err
}
if err := checkIfResponseHasError(httpRsp); err != nil {
httpRsp.Body.Close()
client, err := getStreamClient(ctx, httpRsp)
if err != nil {
return nil, err
}
go func() {
<-ctx.Done()
log.Debug("Closing response body after context marked as done")
httpRsp.Body.Close()
}()
return &tapClient{client}, nil
}
return &tapClient{ctx: ctx, reader: bufio.NewReader(httpRsp.Body)}, nil
func (c *grpcOverHTTPClient) Get(ctx context.Context, req *destinationPb.GetDestination, _ ...grpc.CallOption) (destinationPb.Destination_GetClient, error) {
url := c.endpointNameToPublicAPIURL("DestinationGet")
httpRsp, err := c.post(ctx, url, req)
if err != nil {
return nil, err
}
client, err := getStreamClient(ctx, httpRsp)
if err != nil {
return nil, err
}
return &destinationClient{client}, nil
}
func (c *grpcOverHTTPClient) GetProfile(ctx context.Context, _ *destinationPb.GetDestination, _ ...grpc.CallOption) (destinationPb.Destination_GetProfileClient, error) {
// Not implemented through this client. The proxies use the gRPC server directly instead.
return nil, errors.New("Not implemented")
}
func (c *grpcOverHTTPClient) Endpoints(ctx context.Context, req *discoveryPb.EndpointsParams, _ ...grpc.CallOption) (*discoveryPb.EndpointsResponse, error) {
@ -173,8 +189,7 @@ func (c *grpcOverHTTPClient) endpointNameToPublicAPIURL(endpoint string) *url.UR
}
type tapClient struct {
ctx context.Context
reader *bufio.Reader
streamClient
}
func (c tapClient) Recv() (*pb.TapEvent, error) {
@ -183,13 +198,15 @@ func (c tapClient) Recv() (*pb.TapEvent, error) {
return &msg, err
}
// satisfy the pb.Api_TapClient interface
func (c tapClient) Header() (metadata.MD, error) { return nil, nil }
func (c tapClient) Trailer() metadata.MD { return nil }
func (c tapClient) CloseSend() error { return nil }
func (c tapClient) Context() context.Context { return c.ctx }
func (c tapClient) SendMsg(interface{}) error { return nil }
func (c tapClient) RecvMsg(interface{}) error { return nil }
type destinationClient struct {
streamClient
}
func (c destinationClient) Recv() (*destinationPb.Update, error) {
var msg destinationPb.Update
err := fromByteStreamToProtocolBuffers(c.reader, &msg)
return &msg, err
}
func fromByteStreamToProtocolBuffers(byteStreamContainingMessage *bufio.Reader, out proto.Message) error {
messageAsBytes, err := deserializePayloadFromReader(byteStreamContainingMessage)

View File

@ -8,6 +8,7 @@ import (
"time"
"github.com/golang/protobuf/ptypes/duration"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/util"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
configPb "github.com/linkerd/linkerd2/controller/gen/config"
@ -31,12 +32,14 @@ import (
type APIServer interface {
pb.ApiServer
discoveryPb.DiscoveryServer
destinationPb.DestinationServer
}
type grpcServer struct {
prometheusAPI promv1.API
tapClient tapPb.TapClient
discoveryClient discoveryPb.DiscoveryClient
destinationClient destinationPb.DestinationClient
k8sAPI *k8s.API
controllerNamespace string
ignoredNamespaces []string
@ -61,6 +64,7 @@ func newGrpcServer(
promAPI promv1.API,
tapClient tapPb.TapClient,
discoveryClient discoveryPb.DiscoveryClient,
destinationClient destinationPb.DestinationClient,
k8sAPI *k8s.API,
controllerNamespace string,
ignoredNamespaces []string,
@ -70,6 +74,7 @@ func newGrpcServer(
prometheusAPI: promAPI,
tapClient: tapClient,
discoveryClient: discoveryClient,
destinationClient: destinationClient,
k8sAPI: k8sAPI,
controllerNamespace: controllerNamespace,
ignoredNamespaces: ignoredNamespaces,
@ -264,6 +269,33 @@ func (s *grpcServer) TapByResource(req *pb.TapByResourceRequest, stream pb.Api_T
}
}
// Pass through to Destination service
func (s *grpcServer) Get(req *destinationPb.GetDestination, stream destinationPb.Destination_GetServer) error {
destinationStream := stream.(destinationServer)
destinationClient, err := s.destinationClient.Get(destinationStream.Context(), req)
if err != nil {
log.Errorf("Unexpected error on Destination.Get [%v]: %v", req, err)
return err
}
for {
select {
case <-destinationStream.Context().Done():
return nil
default:
event, err := destinationClient.Recv()
if err != nil {
return err
}
destinationStream.Send(event)
}
}
}
func (s *grpcServer) GetProfile(_ *destinationPb.GetDestination, _ destinationPb.Destination_GetProfileServer) error {
// Not implemented in the Public API. Instead, the proxies should reach the Destination gRPC server directly.
return errors.New("Not implemented")
}
func (s *grpcServer) shouldIgnore(pod *corev1.Pod) bool {
for _, namespace := range s.ignoredNamespaces {
if pod.Namespace == namespace {

View File

@ -8,10 +8,7 @@ import (
"strings"
"testing"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/ptypes/duration"
"github.com/linkerd/linkerd2/controller/api/discovery"
discoveryPb "github.com/linkerd/linkerd2/controller/gen/controller/discovery"
pb "github.com/linkerd/linkerd2/controller/gen/public"
"github.com/linkerd/linkerd2/controller/k8s"
pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
@ -403,6 +400,7 @@ status:
&mProm,
nil,
nil,
nil,
k8sAPI,
"linkerd",
[]string{},
@ -505,6 +503,7 @@ metadata:
&mockProm{},
nil,
nil,
nil,
k8sAPI,
"linkerd",
[]string{},
@ -524,54 +523,6 @@ metadata:
})
}
type endpointsExpected struct {
err error
req *discoveryPb.EndpointsParams
res *discoveryPb.EndpointsResponse
}
func TestEndpoints(t *testing.T) {
t.Run("Queries to the Endpoints endpoint", func(t *testing.T) {
expectations := []endpointsExpected{
{
err: nil,
req: &discoveryPb.EndpointsParams{},
res: &discoveryPb.EndpointsResponse{},
},
}
for _, exp := range expectations {
k8sAPI, err := k8s.NewFakeAPI()
if err != nil {
t.Fatalf("NewFakeAPI returned an error: %s", err)
}
k8sAPI.Sync()
discoveryClient := &discovery.MockDiscoveryClient{
EndpointsResponseToReturn: exp.res,
}
fakeGrpcServer := newGrpcServer(
&mockProm{},
nil,
discoveryClient,
k8sAPI,
"linkerd",
[]string{},
)
rsp, err := fakeGrpcServer.Endpoints(context.TODO(), exp.req)
if !reflect.DeepEqual(err, exp.err) {
t.Fatalf("Expected error: %s, Got: %s", exp.err, err)
}
if !proto.Equal(exp.res, rsp) {
t.Fatalf("Unexpected response: [%+v] != [%+v]", exp.res, rsp)
}
}
})
}
func TestConfig(t *testing.T) {
t.Run("Configs are parsed and returned", func(t *testing.T) {
k8sAPI, err := k8s.NewFakeAPI()
@ -583,6 +534,7 @@ func TestConfig(t *testing.T) {
&mockProm{},
nil,
nil,
nil,
k8sAPI,
"linkerd",
[]string{},

View File

@ -5,6 +5,8 @@ import (
"fmt"
"net/http"
"github.com/golang/protobuf/proto"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
discoveryPb "github.com/linkerd/linkerd2/controller/gen/controller/discovery"
tapPb "github.com/linkerd/linkerd2/controller/gen/controller/tap"
@ -27,6 +29,7 @@ var (
selfCheckPath = fullURLPathFor("SelfCheck")
endpointsPath = fullURLPathFor("Endpoints")
edgesPath = fullURLPathFor("Edges")
destGetPath = fullURLPathFor("DestinationGet")
configPath = fullURLPathFor("Config")
)
@ -64,6 +67,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.handleEndpoints(w, req)
case edgesPath:
h.handleEdges(w, req)
case destGetPath:
h.handleDestGet(w, req)
case configPath:
h.handleConfig(w, req)
default:
@ -234,7 +239,7 @@ func (h *handler) handleTapByResource(w http.ResponseWriter, req *http.Request)
return
}
server := tapServer{w: flushableWriter, req: req}
server := tapServer{streamServer{w: flushableWriter, req: req}}
err = h.grpcServer.TapByResource(&protoRequest, server)
if err != nil {
writeErrorToHTTPResponse(w, err)
@ -242,6 +247,28 @@ func (h *handler) handleTapByResource(w http.ResponseWriter, req *http.Request)
}
}
func (h *handler) handleDestGet(w http.ResponseWriter, req *http.Request) {
flushableWriter, err := newStreamingWriter(w)
if err != nil {
writeErrorToHTTPResponse(w, err)
return
}
var protoRequest destinationPb.GetDestination
err = httpRequestToProto(req, &protoRequest)
if err != nil {
writeErrorToHTTPResponse(w, err)
return
}
server := destinationServer{streamServer{w: flushableWriter, req: req}}
err = h.grpcServer.Get(&protoRequest, server)
if err != nil {
writeErrorToHTTPResponse(w, err)
return
}
}
func (h *handler) handleConfig(w http.ResponseWriter, req *http.Request) {
var protoRequest pb.Empty
err := httpRequestToProto(req, &protoRequest)
@ -263,12 +290,20 @@ func (h *handler) handleConfig(w http.ResponseWriter, req *http.Request) {
}
}
type tapServer struct {
type streamServer struct {
w flushableResponseWriter
req *http.Request
}
func (s tapServer) Send(msg *pb.TapEvent) error {
// satisfy the ServerStream interface
func (s streamServer) SetHeader(metadata.MD) error { return nil }
func (s streamServer) SendHeader(metadata.MD) error { return nil }
func (s streamServer) SetTrailer(metadata.MD) {}
func (s streamServer) Context() context.Context { return s.req.Context() }
func (s streamServer) SendMsg(interface{}) error { return nil }
func (s streamServer) RecvMsg(interface{}) error { return nil }
func (s streamServer) Send(msg proto.Message) error {
err := writeProtoToHTTPResponse(s.w, msg)
if err != nil {
writeErrorToHTTPResponse(s.w, err)
@ -279,13 +314,21 @@ func (s tapServer) Send(msg *pb.TapEvent) error {
return nil
}
// satisfy the pb.Api_TapServer interface
func (s tapServer) SetHeader(metadata.MD) error { return nil }
func (s tapServer) SendHeader(metadata.MD) error { return nil }
func (s tapServer) SetTrailer(metadata.MD) {}
func (s tapServer) Context() context.Context { return s.req.Context() }
func (s tapServer) SendMsg(interface{}) error { return nil }
func (s tapServer) RecvMsg(interface{}) error { return nil }
type tapServer struct {
streamServer
}
func (s tapServer) Send(msg *pb.TapEvent) error {
return s.streamServer.Send(msg)
}
type destinationServer struct {
streamServer
}
func (s destinationServer) Send(msg *destinationPb.Update) error {
return s.streamServer.Send(msg)
}
func fullURLPathFor(method string) string {
return apiRoot + apiPrefix + method
@ -310,6 +353,7 @@ func NewServer(
prometheusClient promApi.Client,
tapClient tapPb.TapClient,
discoveryClient discoveryPb.DiscoveryClient,
destinationClient destinationPb.DestinationClient,
k8sAPI *k8s.API,
controllerNamespace string,
ignoredNamespaces []string,
@ -319,6 +363,7 @@ func NewServer(
promv1.NewAPI(prometheusClient),
tapClient,
discoveryClient,
destinationClient,
k8sAPI,
controllerNamespace,
ignoredNamespaces,

View File

@ -8,6 +8,7 @@ import (
"testing"
"github.com/golang/protobuf/proto"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
healcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
configPb "github.com/linkerd/linkerd2/controller/gen/config"
discoveryPb "github.com/linkerd/linkerd2/controller/gen/controller/discovery"
@ -15,15 +16,17 @@ import (
)
type mockServer struct {
LastRequestReceived proto.Message
ResponseToReturn proto.Message
TapStreamsToReturn []*pb.TapEvent
ErrorToReturn error
LastRequestReceived proto.Message
ResponseToReturn proto.Message
TapStreamsToReturn []*pb.TapEvent
DestinationStreamsToReturn []*destinationPb.Update
ErrorToReturn error
}
type mockGrpcServer struct {
mockServer
TapStreamsToReturn []*pb.TapEvent
TapStreamsToReturn []*pb.TapEvent
DestinationStreamsToReturn []*destinationPb.Update
}
func (m *mockGrpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest) (*pb.StatSummaryResponse, error) {
@ -88,6 +91,22 @@ func (m *mockGrpcServer) TapByResource(req *pb.TapByResourceRequest, tapServer p
return m.ErrorToReturn
}
func (m *mockGrpcServer) Get(req *destinationPb.GetDestination, destinationServer destinationPb.Destination_GetServer) error {
m.LastRequestReceived = req
if m.ErrorToReturn == nil {
for _, msg := range m.DestinationStreamsToReturn {
destinationServer.Send(msg)
}
}
return m.ErrorToReturn
}
func (m *mockGrpcServer) GetProfile(_ *destinationPb.GetDestination, _ destinationPb.Destination_GetProfileServer) error {
// Not implemented in the Public API. Instead, the proxies should reach the Destination gRPC server directly.
return errors.New("Not implemented")
}
func (m *mockGrpcServer) Endpoints(ctx context.Context, req *discoveryPb.EndpointsParams) (*discoveryPb.EndpointsResponse, error) {
m.LastRequestReceived = req
return m.ResponseToReturn.(*discoveryPb.EndpointsResponse), m.ErrorToReturn
@ -101,27 +120,7 @@ type grpcCallTestCase struct {
func TestServer(t *testing.T) {
t.Run("Delegates all non-streaming RPC messages to the underlying grpc server", func(t *testing.T) {
mockGrpcServer := &mockGrpcServer{}
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Could not start listener: %v", err)
}
go func() {
handler := &handler{
grpcServer: mockGrpcServer,
}
err := http.Serve(listener, handler)
if err != nil {
t.Fatalf("Could not start server: %v", err)
}
}()
client, err := NewInternalClient("linkerd", listener.Addr().String())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
mockGrpcServer, client := getServerClient(t)
listPodsReq := &pb.ListPodsRequest{}
testListPods := grpcCallTestCase{
@ -166,27 +165,7 @@ func TestServer(t *testing.T) {
})
t.Run("Delegates all streaming tap RPC messages to the underlying grpc server", func(t *testing.T) {
mockGrpcServer := &mockGrpcServer{}
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Could not start listener: %v", err)
}
go func() {
handler := &handler{
grpcServer: mockGrpcServer,
}
err := http.Serve(listener, handler)
if err != nil {
t.Fatalf("Could not start server: %v", err)
}
}()
client, err := NewInternalClient("linkerd", listener.Addr().String())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
mockGrpcServer, client := getServerClient(t)
expectedTapResponses := []*pb.TapEvent{
{
@ -225,38 +204,103 @@ func TestServer(t *testing.T) {
}
})
t.Run("Handles errors before opening keep-alive response", func(t *testing.T) {
mockGrpcServer := &mockGrpcServer{}
t.Run("Delegates all streaming Destination RPC messages to the underlying grpc server", func(t *testing.T) {
mockGrpcServer, client := getServerClient(t)
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Could not start listener: %v", err)
expectedDestinationGetResponses := []*destinationPb.Update{
{
Update: &destinationPb.Update_Add{
Add: BuildAddrSet(
AuthorityEndpoints{
Namespace: "emojivoto",
ServiceID: "emoji-svc",
Pods: []PodDetails{
{
Name: "emoji-6bf9f47bd5-jjcrl",
IP: 16909060,
Port: 8080,
},
},
},
),
},
},
{
Update: &destinationPb.Update_Add{
Add: BuildAddrSet(
AuthorityEndpoints{
Namespace: "emojivoto",
ServiceID: "voting-svc",
Pods: []PodDetails{
{
Name: "voting-7bf9f47bd5-jjdrl",
IP: 84281096,
Port: 8080,
},
},
},
),
},
},
}
mockGrpcServer.DestinationStreamsToReturn = expectedDestinationGetResponses
mockGrpcServer.ErrorToReturn = nil
go func() {
handler := &handler{
grpcServer: mockGrpcServer,
}
err := http.Serve(listener, handler)
if err != nil {
t.Fatalf("Could not start server: %v", err)
}
}()
client, err := NewInternalClient("linkerd", listener.Addr().String())
destinationGetClient, err := client.Get(context.TODO(), &destinationPb.GetDestination{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
for _, expectedDestinationGetEvent := range expectedDestinationGetResponses {
actualDestinationGetEvent, err := destinationGetClient.Recv()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
if !proto.Equal(actualDestinationGetEvent, expectedDestinationGetEvent) {
t.Fatalf("Expecting destination.get event to be [%v], but was [%v]", expectedDestinationGetEvent, actualDestinationGetEvent)
}
}
})
t.Run("Handles errors before opening keep-alive response", func(t *testing.T) {
mockGrpcServer, client := getServerClient(t)
mockGrpcServer.ErrorToReturn = errors.New("expected error")
_, err = client.Tap(context.TODO(), &pb.TapRequest{})
_, err := client.Tap(context.TODO(), &pb.TapRequest{})
if err == nil {
t.Fatalf("Expecting error, got nothing")
}
})
}
func getServerClient(t *testing.T) (*mockGrpcServer, APIClient) {
mockGrpcServer := &mockGrpcServer{}
listener, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Could not start listener: %v", err)
}
go func() {
handler := &handler{
grpcServer: mockGrpcServer,
}
err := http.Serve(listener, handler)
if err != nil {
t.Fatalf("Could not start server: %v", err)
}
}()
client, err := NewInternalClient("linkerd", listener.Addr().String())
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
return mockGrpcServer, client
}
func assertCallWasForwarded(t *testing.T, mockServer *mockServer, expectedRequest proto.Message, expectedResponse proto.Message, functionCall func() (proto.Message, error)) {
mockServer.ErrorToReturn = nil
mockServer.ResponseToReturn = expectedResponse

View File

@ -1090,6 +1090,7 @@ status:
&mockProm{Res: exp.mockPromResponse},
nil,
nil,
nil,
k8sAPI,
"linkerd",
[]string{},
@ -1115,6 +1116,7 @@ status:
&mockProm{Res: model.Vector{}},
nil,
nil,
nil,
k8sAPI,
"linkerd",
[]string{},

View File

@ -0,0 +1,38 @@
package public
import (
"bufio"
"context"
"net/http"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
)
type streamClient struct {
ctx context.Context
reader *bufio.Reader
}
// satisfy the ClientStream interface
func (c streamClient) Header() (metadata.MD, error) { return nil, nil }
func (c streamClient) Trailer() metadata.MD { return nil }
func (c streamClient) CloseSend() error { return nil }
func (c streamClient) Context() context.Context { return c.ctx }
func (c streamClient) SendMsg(interface{}) error { return nil }
func (c streamClient) RecvMsg(interface{}) error { return nil }
func getStreamClient(ctx context.Context, httpRsp *http.Response) (streamClient, error) {
if err := checkIfResponseHasError(httpRsp); err != nil {
httpRsp.Body.Close()
return streamClient{}, err
}
go func() {
<-ctx.Done()
log.Debug("Closing response body after context marked as done")
httpRsp.Body.Close()
}()
return streamClient{ctx: ctx, reader: bufio.NewReader(httpRsp.Body)}, nil
}

View File

@ -2,6 +2,7 @@ package public
import (
"context"
"errors"
"fmt"
"io"
"reflect"
@ -9,6 +10,8 @@ import (
"sync"
"time"
destinationPb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2-proxy-api/go/net"
"github.com/linkerd/linkerd2/controller/api/discovery"
healthcheckPb "github.com/linkerd/linkerd2/controller/gen/common/healthcheck"
configPb "github.com/linkerd/linkerd2/controller/gen/config"
@ -32,6 +35,7 @@ type MockAPIClient struct {
ConfigResponseToReturn *configPb.All
APITapClientToReturn pb.Api_TapClient
APITapByResourceClientToReturn pb.Api_TapByResourceClient
DestinationGetClientToReturn destinationPb.Destination_GetClient
*discovery.MockDiscoveryClient
}
@ -75,6 +79,17 @@ func (c *MockAPIClient) TapByResource(ctx context.Context, in *pb.TapByResourceR
return c.APITapByResourceClientToReturn, c.ErrorToReturn
}
// Get provides a mock of a Public API method.
func (c *MockAPIClient) Get(ctx context.Context, in *destinationPb.GetDestination, opts ...grpc.CallOption) (destinationPb.Destination_GetClient, error) {
return c.DestinationGetClientToReturn, c.ErrorToReturn
}
// GetProfile provides a mock of a Public API method
func (c *MockAPIClient) GetProfile(ctx context.Context, _ *destinationPb.GetDestination, _ ...grpc.CallOption) (destinationPb.Destination_GetProfileClient, error) {
// Not implemented through this client. The proxies use the gRPC server directly instead.
return nil, errors.New("Not implemented")
}
// SelfCheck provides a mock of a Public API method.
func (c *MockAPIClient) SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckRequest, _ ...grpc.CallOption) (*healthcheckPb.SelfCheckResponse, error) {
return c.SelfCheckResponseToReturn, c.ErrorToReturn
@ -109,6 +124,63 @@ func (a *MockAPITapByResourceClient) Recv() (*pb.TapEvent, error) {
return &eventPopped, errorPopped
}
// MockDestinationGetClient satisfies the Destination_GetClient gRPC interface.
type MockDestinationGetClient struct {
UpdatesToReturn []destinationPb.Update
ErrorsToReturn []error
grpc.ClientStream
sync.Mutex
}
// Recv satisfies the Destination_GetClient.Recv() gRPC method.
func (a *MockDestinationGetClient) Recv() (*destinationPb.Update, error) {
a.Lock()
defer a.Unlock()
var updatePopped destinationPb.Update
var errorPopped error
if len(a.UpdatesToReturn) == 0 && len(a.ErrorsToReturn) == 0 {
return nil, io.EOF
}
if len(a.UpdatesToReturn) != 0 {
updatePopped, a.UpdatesToReturn = a.UpdatesToReturn[0], a.UpdatesToReturn[1:]
}
if len(a.ErrorsToReturn) != 0 {
errorPopped, a.ErrorsToReturn = a.ErrorsToReturn[0], a.ErrorsToReturn[1:]
}
return &updatePopped, errorPopped
}
// AuthorityEndpoints holds the details for the Endpoints associated to an authority
type AuthorityEndpoints struct {
Namespace string
ServiceID string
Pods []PodDetails
}
// PodDetails holds the details for pod associated to an Endpoint
type PodDetails struct {
Name string
IP uint32
Port uint32
}
// BuildAddrSet converts AuthorityEndpoints into its protobuf representation
func BuildAddrSet(endpoint AuthorityEndpoints) *destinationPb.WeightedAddrSet {
addrs := make([]*destinationPb.WeightedAddr, 0)
for _, pod := range endpoint.Pods {
addr := &net.TcpAddress{
Ip: &net.IPAddress{Ip: &net.IPAddress_Ipv4{Ipv4: pod.IP}},
Port: pod.Port,
}
labels := map[string]string{"pod": pod.Name}
weightedAddr := &destinationPb.WeightedAddr{Addr: addr, MetricLabels: labels}
addrs = append(addrs, weightedAddr)
}
labels := map[string]string{"namespace": endpoint.Namespace, "service": endpoint.ServiceID}
return &destinationPb.WeightedAddrSet{Addrs: addrs, MetricLabels: labels}
}
//
// Prometheus client
//
@ -331,6 +403,7 @@ func newMockGrpcServer(exp expectedStatRPC) (*mockProm, *grpcServer, error) {
mockProm,
nil,
nil,
nil,
k8sAPI,
"linkerd",
[]string{},

View File

@ -8,6 +8,7 @@ import (
"strings"
"syscall"
"github.com/linkerd/linkerd2/controller/api/destination"
"github.com/linkerd/linkerd2/controller/api/discovery"
"github.com/linkerd/linkerd2/controller/api/public"
"github.com/linkerd/linkerd2/controller/k8s"
@ -44,6 +45,12 @@ func main() {
}
defer discoveryConn.Close()
destinationClient, destinationConn, err := destination.NewClient(*destinationAPIAddr)
if err != nil {
log.Fatal(err.Error())
}
defer destinationConn.Close()
k8sAPI, err := k8s.InitializeAPI(
*kubeConfigPath,
k8s.DS, k8s.Deploy, k8s.Job, k8s.NS, k8s.Pod, k8s.RC, k8s.RS, k8s.Svc, k8s.SS, k8s.SP,
@ -62,6 +69,7 @@ func main() {
prometheusClient,
tapClient,
discoveryClient,
destinationClient,
k8sAPI,
*controllerNamespace,
strings.Split(*ignoredNamespaces, ","),

View File

@ -6,9 +6,9 @@ import (
"io"
pb "github.com/linkerd/linkerd2-proxy-api/go/destination"
"github.com/linkerd/linkerd2/controller/api/destination"
addrUtil "github.com/linkerd/linkerd2/pkg/addr"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
)
// This is a throwaway script for testing the destination service
@ -19,7 +19,7 @@ func main() {
method := flag.String("method", "get", "which gRPC method to invoke")
flag.Parse()
client, conn, err := newClient(*addr)
client, conn, err := destination.NewClient(*addr)
if err != nil {
log.Fatal(err.Error())
}
@ -40,16 +40,6 @@ func main() {
}
}
// newClient creates a new gRPC client to the Destination service.
func newClient(addr string) (pb.DestinationClient, *grpc.ClientConn, error) {
conn, err := grpc.Dial(addr, grpc.WithInsecure())
if err != nil {
return nil, nil, err
}
return pb.NewDestinationClient(conn), conn, nil
}
func get(client pb.DestinationClient, req *pb.GetDestination) {
rsp, err := client.Get(context.Background(), req)
if err != nil {