From 19001f8d386e6a734bb477b28a6fcc737721f39d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Phil=20Cal=C3=A7ado?= Date: Mon, 2 Apr 2018 21:36:57 -0400 Subject: [PATCH] Add pod-based metric_labels to destinations response (#429) (#654) * Extracted logic from destination server * Make tests follow style used elsewhere in the code * Extract single interface for resolvers * Add tests for k8s and ipv4 resolvers * Fix small usability issues * Update dep * Act on feedback * Add pod-based metric_labels to destinations response * Add documentation on running control plane to BUILD.md Signed-off-by: Phil Calcado * Fix mock controller in proxy tests (#656) Signed-off-by: Eliza Weisman * Address review feedback * Rename files in the destination package Signed-off-by: Kevin Lingerfelt --- BUILD.md | 20 +++ .../destination/destination_resolver.go | 16 -- controller/destination/k8s_resolver.go | 1 + controller/destination/listener.go | 111 +++++++++++++ controller/destination/listener_test.go | 157 ++++++++++++++++++ .../{echo_ip_resolver.go => resolver.go} | 6 + ...o_ip_resolver_test.go => resolver_test.go} | 0 controller/destination/server.go | 83 ++------- controller/destination/server_test.go | 127 ++++---------- controller/destination/test_helper.go | 1 + .../gen/proxy/destination/destination.pb.go | 72 +++++--- controller/k8s/pods.go | 36 +++- controller/k8s/test_helper.go | 27 +++ controller/script/destination-client/main.go | 5 +- controller/tap/server.go | 4 +- controller/telemetry/server.go | 21 +-- proto/proxy/destination/destination.proto | 2 + proxy/tests/support/controller.rs | 4 + 18 files changed, 456 insertions(+), 237 deletions(-) delete mode 100644 controller/destination/destination_resolver.go create mode 100644 controller/destination/listener.go create mode 100644 controller/destination/listener_test.go rename controller/destination/{echo_ip_resolver.go => resolver.go} (78%) rename controller/destination/{echo_ip_resolver_test.go => resolver_test.go} (100%) diff --git a/BUILD.md b/BUILD.md index 4d5d4a5b3..f1f0ec304 100644 --- a/BUILD.md +++ b/BUILD.md @@ -206,6 +206,26 @@ In development you can run: bin/go-run cli check ``` +### Running the control plane for development + +Conduit's control plane is composed of several Go microservices. You can run +these components in a Kubernetes (or Minikube) cluster, or even locally. + +To run an individual component locally, you can use the `go-run` command, and +pass in valid Kubernetes credentials via the `-kubeconfig` flag. For instance, +to run the destination service locally, run: + +```bash +bin/go-run controller/cmd/destination -kubeconfig ~/.kube/config -log-level debug +``` + +You can send test requests to the destination service using the +`destination-client` in the `controller/script` directory. For instance: + +```bash +bin/go-run controller/script/destination-client -path hello.default.svc.cluster.local:80 +``` + ## Web This is a React app fronting a Go process. It uses webpack to bundle assets, and diff --git a/controller/destination/destination_resolver.go b/controller/destination/destination_resolver.go deleted file mode 100644 index aaa34d74e..000000000 --- a/controller/destination/destination_resolver.go +++ /dev/null @@ -1,16 +0,0 @@ -package destination - -import ( - common "github.com/runconduit/conduit/controller/gen/common" -) - -type streamingDestinationResolver interface { - canResolve(host string, port int) (bool, error) - streamResolution(host string, port int, listener updateListener) error -} - -type updateListener interface { - Update(add []common.TcpAddress, remove []common.TcpAddress) - Done() <-chan struct{} - NoEndpoints(exists bool) -} diff --git a/controller/destination/k8s_resolver.go b/controller/destination/k8s_resolver.go index db3891cae..4a6f57888 100644 --- a/controller/destination/k8s_resolver.go +++ b/controller/destination/k8s_resolver.go @@ -15,6 +15,7 @@ import ( var dnsCharactersRegexp = regexp.MustCompile("^[a-zA-Z0-9_-]{0,63}$") var containsAlphaRegexp = regexp.MustCompile("[a-zA-Z]") +// implements the streamingDestinationResolver interface type k8sResolver struct { k8sDNSZoneLabels []string endpointsWatcher k8s.EndpointsWatcher diff --git a/controller/destination/listener.go b/controller/destination/listener.go new file mode 100644 index 000000000..6d97648f6 --- /dev/null +++ b/controller/destination/listener.go @@ -0,0 +1,111 @@ +package destination + +import ( + common "github.com/runconduit/conduit/controller/gen/common" + pb "github.com/runconduit/conduit/controller/gen/proxy/destination" + "github.com/runconduit/conduit/controller/k8s" + "github.com/runconduit/conduit/controller/util" + log "github.com/sirupsen/logrus" +) + +type updateListener interface { + Update(add []common.TcpAddress, remove []common.TcpAddress) + Done() <-chan struct{} + NoEndpoints(exists bool) +} + +// implements the updateListener interface +type endpointListener struct { + serviceName string + stream pb.Destination_GetServer + podsByIp k8s.PodIndex +} + +func (l *endpointListener) Done() <-chan struct{} { + return l.stream.Context().Done() +} + +func (l *endpointListener) Update(add []common.TcpAddress, remove []common.TcpAddress) { + if len(add) > 0 { + update := &pb.Update{ + Update: &pb.Update_Add{ + Add: l.toWeightedAddrSet(add), + }, + } + err := l.stream.Send(update) + if err != nil { + log.Error(err) + } + } + if len(remove) > 0 { + update := &pb.Update{ + Update: &pb.Update_Remove{ + Remove: l.toAddrSet(remove), + }, + } + err := l.stream.Send(update) + if err != nil { + log.Error(err) + } + } +} + +func (l *endpointListener) NoEndpoints(exists bool) { + update := &pb.Update{ + Update: &pb.Update_NoEndpoints{ + NoEndpoints: &pb.NoEndpoints{ + Exists: exists, + }, + }, + } + l.stream.Send(update) +} + +func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet { + var namespace string + addrs := make([]*pb.WeightedAddr, 0) + for i, address := range endpoints { + metricLabelsForPod := map[string]string{} + + ipAsString := util.IPToString(address.Ip) + resultingPods, err := l.podsByIp.GetPodsByIndex(ipAsString) + if err != nil { + log.Errorf("Error while finding pod for IP [%s], this IP will be sent with no metric labels: %v", ipAsString, err) + } else { + if len(resultingPods) == 0 || resultingPods[0] == nil { + log.Errorf("Could not find pod for IP [%s], this IP will be sent with no metric labels.", ipAsString) + } else { + pod := resultingPods[0] + metricLabelsForPod = map[string]string{ + "k8s_pod": pod.Name, + } + + namespace = pod.Namespace + } + } + + addrs = append(addrs, &pb.WeightedAddr{ + Addr: &endpoints[i], + Weight: 1, + MetricLabels: metricLabelsForPod, + }) + } + + globalMetricLabels := map[string]string{ + "k8s_service": l.serviceName, + "k8s_namespace": namespace, + } + + return &pb.WeightedAddrSet{ + Addrs: addrs, + MetricLabels: globalMetricLabels, + } +} + +func (l *endpointListener) toAddrSet(endpoints []common.TcpAddress) *pb.AddrSet { + addrs := make([]*common.TcpAddress, 0) + for i := range endpoints { + addrs = append(addrs, &endpoints[i]) + } + return &pb.AddrSet{Addrs: addrs} +} diff --git a/controller/destination/listener_test.go b/controller/destination/listener_test.go new file mode 100644 index 000000000..364200006 --- /dev/null +++ b/controller/destination/listener_test.go @@ -0,0 +1,157 @@ +package destination + +import ( + "context" + "reflect" + "testing" + + common "github.com/runconduit/conduit/controller/gen/common" + pb "github.com/runconduit/conduit/controller/gen/proxy/destination" + "github.com/runconduit/conduit/controller/k8s" + "github.com/runconduit/conduit/controller/util" + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" +) + +func TestEndpointListener(t *testing.T) { + t.Run("Sends one update for add and another for remove", func(t *testing.T) { + mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} + + listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + + addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 1}}, Port: 1} + addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 2}}, Port: 2} + removedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 100}}, Port: 100} + + listener.Update([]common.TcpAddress{addedAddress1, addedAddress2}, []common.TcpAddress{removedAddress1}) + + expectedNumUpdates := 2 + actualNumUpdates := len(mockGetServer.updatesReceived) + if actualNumUpdates != expectedNumUpdates { + t.Fatalf("Expecting [%d] updates, got [%d]. Updates: %v", expectedNumUpdates, actualNumUpdates, mockGetServer.updatesReceived) + } + }) + + t.Run("Sends addresses as removed or added", func(t *testing.T) { + mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} + + listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + + addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 1}}, Port: 1} + addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 2}}, Port: 2} + removedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 100}}, Port: 100} + + listener.Update([]common.TcpAddress{addedAddress1, addedAddress2}, []common.TcpAddress{removedAddress1}) + + addressesAdded := mockGetServer.updatesReceived[0].GetAdd().Addrs + actualNumberOfAdded := len(addressesAdded) + expectedNumberOfAdded := 2 + if actualNumberOfAdded != expectedNumberOfAdded { + t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded) + } + + addressesRemoved := mockGetServer.updatesReceived[1].GetRemove().Addrs + actualNumberOfRemoved := len(addressesRemoved) + expectedNumberOfRemoved := 1 + if actualNumberOfRemoved != expectedNumberOfRemoved { + t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved) + } + + checkAddress(t, addressesAdded[0], &addedAddress1) + checkAddress(t, addressesAdded[1], &addedAddress2) + + actualAddressRemoved := addressesRemoved[0] + expectedAddressRemoved := &removedAddress1 + if !reflect.DeepEqual(actualAddressRemoved, expectedAddressRemoved) { + t.Fatalf("Expected remove address to be [%s], but it was [%s]", expectedAddressRemoved, actualAddressRemoved) + } + }) + + t.Run("It returns when the underlying context is done", func(t *testing.T) { + context, cancelFn := context.WithCancel(context.Background()) + mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}, contextToReturn: context} + listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + + completed := make(chan bool) + go func() { + <-listener.Done() + completed <- true + }() + + cancelFn() + + c := <-completed + + if !c { + t.Fatalf("Expected function to be completed after the cancel()") + } + }) + + t.Run("Sends metric labels with added addresses", func(t *testing.T) { + expectedServiceName := "service-name" + expectedPodName := "pod1" + expectedNamespace := "this-namespace" + + addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1} + ipForAddr1 := util.IPToString(addedAddress1.Ip) + podForAddedAddress1 := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: expectedPodName, + Namespace: expectedNamespace, + }, + } + addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 222}}, Port: 22} + podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string]*v1.Pod{ipForAddr1: podForAddedAddress1}} + + mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} + listener := &endpointListener{ + podsByIp: podIndex, + serviceName: expectedServiceName, + stream: mockGetServer, + } + + listener.Update([]common.TcpAddress{addedAddress1, addedAddress2}, nil) + + actualGlobalMetricLabels := mockGetServer.updatesReceived[0].GetAdd().MetricLabels + expectedGlobalMetricLabels := map[string]string{"k8s_namespace": expectedNamespace, "k8s_service": expectedServiceName} + if !reflect.DeepEqual(actualGlobalMetricLabels, expectedGlobalMetricLabels) { + t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedGlobalMetricLabels, actualGlobalMetricLabels) + } + + actualAddedAddress1MetricLabels := mockGetServer.updatesReceived[0].GetAdd().Addrs[0].MetricLabels + expectedAddedAddress1MetricLabels := map[string]string{"k8s_pod": expectedPodName} + if !reflect.DeepEqual(actualAddedAddress1MetricLabels, expectedAddedAddress1MetricLabels) { + t.Fatalf("Expected global metric labels sent to be [%v] but was [%v]", expectedAddedAddress1MetricLabels, actualAddedAddress1MetricLabels) + } + }) + + t.Run("It returns when the underlying context is done", func(t *testing.T) { + context, cancelFn := context.WithCancel(context.Background()) + mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}, contextToReturn: context} + listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + + completed := make(chan bool) + go func() { + <-listener.Done() + completed <- true + }() + + cancelFn() + + c := <-completed + + if !c { + t.Fatalf("Expected function to be completed after the cancel()") + } + }) +} + +func checkAddress(t *testing.T, addr *pb.WeightedAddr, expectedAddress *common.TcpAddress) { + actualAddress := addr.Addr + actualWeight := addr.Weight + expectedWeight := uint32(1) + + if !reflect.DeepEqual(actualAddress, expectedAddress) || actualWeight != expectedWeight { + t.Fatalf("Expected added address to be [%+v] and weight to be [%d], but it was [%+v] and [%d]", expectedAddress, expectedWeight, actualAddress, actualWeight) + } +} diff --git a/controller/destination/echo_ip_resolver.go b/controller/destination/resolver.go similarity index 78% rename from controller/destination/echo_ip_resolver.go rename to controller/destination/resolver.go index ca8f81701..ed8202384 100644 --- a/controller/destination/echo_ip_resolver.go +++ b/controller/destination/resolver.go @@ -7,6 +7,12 @@ import ( "github.com/runconduit/conduit/controller/util" ) +type streamingDestinationResolver interface { + canResolve(host string, port int) (bool, error) + streamResolution(host string, port int, listener updateListener) error +} + +// implements the streamingDestinationResolver interface type echoIpV4Resolver struct{} func (i *echoIpV4Resolver) canResolve(host string, port int) (bool, error) { diff --git a/controller/destination/echo_ip_resolver_test.go b/controller/destination/resolver_test.go similarity index 100% rename from controller/destination/echo_ip_resolver_test.go rename to controller/destination/resolver_test.go diff --git a/controller/destination/server.go b/controller/destination/server.go index a72a041fb..14ff272eb 100644 --- a/controller/destination/server.go +++ b/controller/destination/server.go @@ -15,6 +15,7 @@ import ( ) type server struct { + podsByIp k8s.PodIndex resolvers []streamingDestinationResolver } @@ -34,6 +35,15 @@ func NewServer(addr, kubeconfig string, k8sDNSZone string, done chan struct{}) ( return nil, nil, err } + podsByIp, err := k8s.NewPodsByIp(clientSet) + if err != nil { + return nil, nil, err + } + err = podsByIp.Run() + if err != nil { + return nil, nil, err + } + endpointsWatcher := k8s.NewEndpointsWatcher(clientSet) err = endpointsWatcher.Run() if err != nil { @@ -48,6 +58,7 @@ func NewServer(addr, kubeconfig string, k8sDNSZone string, done chan struct{}) ( } srv := server{ + podsByIp: podsByIp, resolvers: resolvers, } @@ -92,13 +103,14 @@ func (s *server) Get(dest *common.Destination, stream pb.Destination_GetServer) } } - return streamResolutionUsingCorrectResolverFor(s.resolvers, host, port, stream) + return s.streamResolutionUsingCorrectResolverFor(host, port, stream) } -func streamResolutionUsingCorrectResolverFor(resolvers []streamingDestinationResolver, host string, port int, stream pb.Destination_GetServer) error { - listener := &endpointListener{stream: stream} +func (s *server) streamResolutionUsingCorrectResolverFor(host string, port int, stream pb.Destination_GetServer) error { + serviceName := fmt.Sprintf("%s:%d", host, port) + listener := &endpointListener{serviceName: serviceName, stream: stream, podsByIp: s.podsByIp} - for _, resolver := range resolvers { + for _, resolver := range s.resolvers { resolverCanResolve, err := resolver.canResolve(host, port) if err != nil { return fmt.Errorf("resolver [%+v] found error resolving host [%s] port[%d]: %v", resolver, host, port, err) @@ -134,66 +146,3 @@ func buildResolversList(k8sDNSZone string, endpointsWatcher k8s.EndpointsWatcher return []streamingDestinationResolver{ipResolver, k8sResolver}, nil } - -type endpointListener struct { - stream pb.Destination_GetServer -} - -func (listener *endpointListener) Done() <-chan struct{} { - return listener.stream.Context().Done() -} - -func (listener *endpointListener) Update(add []common.TcpAddress, remove []common.TcpAddress) { - if len(add) > 0 { - update := &pb.Update{ - Update: &pb.Update_Add{ - Add: toWeightedAddrSet(add), - }, - } - err := listener.stream.Send(update) - if err != nil { - log.Error(err) - } - } - if len(remove) > 0 { - update := &pb.Update{ - Update: &pb.Update_Remove{ - Remove: toAddrSet(remove), - }, - } - err := listener.stream.Send(update) - if err != nil { - log.Error(err) - } - } -} - -func (listener *endpointListener) NoEndpoints(exists bool) { - update := &pb.Update{ - Update: &pb.Update_NoEndpoints{ - NoEndpoints: &pb.NoEndpoints{ - Exists: exists, - }, - }, - } - listener.stream.Send(update) -} - -func toWeightedAddrSet(endpoints []common.TcpAddress) *pb.WeightedAddrSet { - addrs := make([]*pb.WeightedAddr, 0) - for i := range endpoints { - addrs = append(addrs, &pb.WeightedAddr{ - Addr: &endpoints[i], - Weight: 1, - }) - } - return &pb.WeightedAddrSet{Addrs: addrs} -} - -func toAddrSet(endpoints []common.TcpAddress) *pb.AddrSet { - addrs := make([]*common.TcpAddress, 0) - for i := range endpoints { - addrs = append(addrs, &endpoints[i]) - } - return &pb.AddrSet{Addrs: addrs} -} diff --git a/controller/destination/server_test.go b/controller/destination/server_test.go index 0cb512574..fc55d6f8f 100644 --- a/controller/destination/server_test.go +++ b/controller/destination/server_test.go @@ -3,10 +3,8 @@ package destination import ( "context" "errors" - "reflect" "testing" - common "github.com/runconduit/conduit/controller/gen/common" pb "github.com/runconduit/conduit/controller/gen/proxy/destination" "github.com/runconduit/conduit/controller/k8s" "google.golang.org/grpc/metadata" @@ -66,82 +64,7 @@ func TestBuildResolversList(t *testing.T) { }) } -func TestEndpointListener(t *testing.T) { - - t.Run("Sends one update for add and another for remove", func(t *testing.T) { - mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} - - listener := &endpointListener{stream: mockGetServer} - - addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 1}}, Port: 1} - addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 2}}, Port: 2} - removedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 100}}, Port: 100} - - listener.Update([]common.TcpAddress{addedAddress1, addedAddress2}, []common.TcpAddress{removedAddress1}) - - expectedNumUpdates := 2 - actualNumUpdates := len(mockGetServer.updatesReceived) - if actualNumUpdates != expectedNumUpdates { - t.Fatalf("Expecting [%d] updates, got [%d]. Updates: %v", expectedNumUpdates, actualNumUpdates, mockGetServer.updatesReceived) - } - }) - - t.Run("Sends addresses as removed or added", func(t *testing.T) { - mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} - - listener := &endpointListener{stream: mockGetServer} - - addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 1}}, Port: 1} - addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 2}}, Port: 2} - removedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 100}}, Port: 100} - - listener.Update([]common.TcpAddress{addedAddress1, addedAddress2}, []common.TcpAddress{removedAddress1}) - - addressesAdded := mockGetServer.updatesReceived[0].GetAdd().Addrs - actualNumberOfAdded := len(addressesAdded) - expectedNumberOfAdded := 2 - if actualNumberOfAdded != expectedNumberOfAdded { - t.Fatalf("Expecting [%d] addresses to be added, got [%d]: %v", expectedNumberOfAdded, actualNumberOfAdded, addressesAdded) - } - - addressesRemoved := mockGetServer.updatesReceived[1].GetRemove().Addrs - actualNumberOfRemoved := len(addressesRemoved) - expectedNumberOfRemoved := 1 - if actualNumberOfRemoved != expectedNumberOfRemoved { - t.Fatalf("Expecting [%d] addresses to be removed, got [%d]: %v", expectedNumberOfRemoved, actualNumberOfRemoved, addressesRemoved) - } - - checkAddress(t, addressesAdded[0], &addedAddress1) - checkAddress(t, addressesAdded[1], &addedAddress2) - - actualAddressRemoved := addressesRemoved[0] - expectedAddressRemoved := &removedAddress1 - if !reflect.DeepEqual(actualAddressRemoved, expectedAddressRemoved) { - t.Fatalf("Expected remove address to be [%s], but it was [%s]", expectedAddressRemoved, actualAddressRemoved) - } - }) - - t.Run("It returns when the underlying context is done", func(t *testing.T) { - context, cancelFn := context.WithCancel(context.Background()) - mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}, contextToReturn: context} - listener := &endpointListener{stream: mockGetServer} - - completed := make(chan bool) - go func() { - <-listener.Done() - completed <- true - }() - - cancelFn() - - c := <-completed - - if !c { - t.Fatalf("Expected function to be completed after the cancel()") - } - }) -} - +// implements the streamingDestinationResolver interface type mockStreamingDestinationResolver struct { hostReceived string portReceived int @@ -172,9 +95,12 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { yes := &mockStreamingDestinationResolver{canResolveToReturn: true} otherYes := &mockStreamingDestinationResolver{canResolveToReturn: true} - resolvers := []streamingDestinationResolver{no, no, yes, no, no, otherYes} + server := server{ + podsByIp: k8s.NewEmptyPodIndex(), + resolvers: []streamingDestinationResolver{no, no, yes, no, no, otherYes}, + } - err := streamResolutionUsingCorrectResolverFor(resolvers, host, port, stream) + err := server.streamResolutionUsingCorrectResolverFor(host, port, stream) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -195,37 +121,42 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { t.Run("Returns error if no resolver can resolve", func(t *testing.T) { no := &mockStreamingDestinationResolver{canResolveToReturn: false} - resolvers := []streamingDestinationResolver{no, no, no, no} - - err := streamResolutionUsingCorrectResolverFor(resolvers, host, port, stream) + server := server{ + podsByIp: k8s.NewEmptyPodIndex(), + resolvers: []streamingDestinationResolver{no, no, no, no}, + } + err := server.streamResolutionUsingCorrectResolverFor(host, port, stream) if err == nil { t.Fatalf("Expecting error, got nothing") } }) - t.Run("Returns error if the resolver returned an error", func(t *testing.T) { - errorOnCanResolve := &mockStreamingDestinationResolver{canResolveToReturn: true, errToReturnForCanResolve: errors.New("expected for can resolve")} - errorOnResolving := &mockStreamingDestinationResolver{canResolveToReturn: true, errToReturnForResolution: errors.New("expected for resolving")} + t.Run("Returns error if the resolver returned an error on canResolve", func(t *testing.T) { + resolver := &mockStreamingDestinationResolver{canResolveToReturn: true, errToReturnForCanResolve: errors.New("expected for can resolve")} - err := streamResolutionUsingCorrectResolverFor([]streamingDestinationResolver{errorOnCanResolve}, host, port, stream) + server := server{ + podsByIp: k8s.NewEmptyPodIndex(), + resolvers: []streamingDestinationResolver{resolver}, + } + + err := server.streamResolutionUsingCorrectResolverFor(host, port, stream) if err == nil { t.Fatalf("Expecting error, got nothing") } + }) - err = streamResolutionUsingCorrectResolverFor([]streamingDestinationResolver{errorOnResolving}, host, port, stream) + t.Run("Returns error if the resolver returned an error on streamResolution", func(t *testing.T) { + resolver := &mockStreamingDestinationResolver{canResolveToReturn: true, errToReturnForResolution: errors.New("expected for resolving")} + + server := server{ + podsByIp: k8s.NewEmptyPodIndex(), + resolvers: []streamingDestinationResolver{resolver}, + } + + err := server.streamResolutionUsingCorrectResolverFor(host, port, stream) if err == nil { t.Fatalf("Expecting error, got nothing") } }) } - -func checkAddress(t *testing.T, addr *pb.WeightedAddr, expectedAddress *common.TcpAddress) { - actualAddress := addr.Addr - actualWeight := addr.Weight - expectedWeight := uint32(1) - - if !reflect.DeepEqual(actualAddress, expectedAddress) || actualWeight != expectedWeight { - t.Fatalf("Expected added address to be [%+v] and weight to be [%d], but it was [%+v] and [%d]", expectedAddress, expectedWeight, actualAddress, actualWeight) - } -} diff --git a/controller/destination/test_helper.go b/controller/destination/test_helper.go index 21b10a24c..43b8824cc 100644 --- a/controller/destination/test_helper.go +++ b/controller/destination/test_helper.go @@ -6,6 +6,7 @@ import ( common "github.com/runconduit/conduit/controller/gen/common" ) +// implements the updateListener interface type collectUpdateListener struct { added []common.TcpAddress removed []common.TcpAddress diff --git a/controller/gen/proxy/destination/destination.pb.go b/controller/gen/proxy/destination/destination.pb.go index 20033f7ad..3f1c6576c 100644 --- a/controller/gen/proxy/destination/destination.pb.go +++ b/controller/gen/proxy/destination/destination.pb.go @@ -204,7 +204,8 @@ func (m *AddrSet) GetAddrs() []*conduit_common.TcpAddress { } type WeightedAddrSet struct { - Addrs []*WeightedAddr `protobuf:"bytes,1,rep,name=addrs" json:"addrs,omitempty"` + Addrs []*WeightedAddr `protobuf:"bytes,1,rep,name=addrs" json:"addrs,omitempty"` + MetricLabels map[string]string `protobuf:"bytes,2,rep,name=metric_labels,json=metricLabels" json:"metric_labels,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` } func (m *WeightedAddrSet) Reset() { *m = WeightedAddrSet{} } @@ -219,9 +220,17 @@ func (m *WeightedAddrSet) GetAddrs() []*WeightedAddr { return nil } +func (m *WeightedAddrSet) GetMetricLabels() map[string]string { + if m != nil { + return m.MetricLabels + } + return nil +} + type WeightedAddr struct { - Addr *conduit_common.TcpAddress `protobuf:"bytes,1,opt,name=addr" json:"addr,omitempty"` - Weight uint32 `protobuf:"varint,3,opt,name=weight" json:"weight,omitempty"` + Addr *conduit_common.TcpAddress `protobuf:"bytes,1,opt,name=addr" json:"addr,omitempty"` + Weight uint32 `protobuf:"varint,3,opt,name=weight" json:"weight,omitempty"` + MetricLabels map[string]string `protobuf:"bytes,4,rep,name=metric_labels,json=metricLabels" json:"metric_labels,omitempty" protobuf_key:"bytes,1,opt,name=key" protobuf_val:"bytes,2,opt,name=value"` } func (m *WeightedAddr) Reset() { *m = WeightedAddr{} } @@ -243,6 +252,13 @@ func (m *WeightedAddr) GetWeight() uint32 { return 0 } +func (m *WeightedAddr) GetMetricLabels() map[string]string { + if m != nil { + return m.MetricLabels + } + return nil +} + type NoEndpoints struct { Exists bool `protobuf:"varint,1,opt,name=exists" json:"exists,omitempty"` } @@ -373,26 +389,32 @@ var _Destination_serviceDesc = grpc.ServiceDesc{ func init() { proto.RegisterFile("proxy/destination/destination.proto", fileDescriptor0) } var fileDescriptor0 = []byte{ - // 327 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x8c, 0x92, 0x5d, 0x4b, 0xf3, 0x30, - 0x14, 0xc7, 0xd7, 0xa7, 0x8f, 0x75, 0x9c, 0x4e, 0x84, 0x08, 0x52, 0xeb, 0xcd, 0x8c, 0xa8, 0xc3, - 0x8b, 0x6c, 0xd4, 0x4b, 0x5f, 0x40, 0x51, 0x9c, 0x08, 0x22, 0xf5, 0xf5, 0x4e, 0x6a, 0x13, 0x34, - 0x17, 0x4d, 0x4a, 0x93, 0xe9, 0xfc, 0xb0, 0x7e, 0x17, 0x69, 0x9a, 0xd9, 0x3a, 0xb0, 0x78, 0xd5, - 0x9e, 0xf6, 0x77, 0x7e, 0xc9, 0xf9, 0x73, 0x60, 0x33, 0x2f, 0xe4, 0xf4, 0x63, 0x48, 0x99, 0xd2, - 0x5c, 0x24, 0x9a, 0x4b, 0xd1, 0x7c, 0x27, 0x79, 0x21, 0xb5, 0x44, 0x6b, 0xa9, 0x14, 0x74, 0xc2, - 0x35, 0x31, 0x30, 0x69, 0x00, 0xe1, 0x4a, 0x2a, 0xb3, 0x4c, 0x8a, 0x61, 0xf5, 0xa8, 0x78, 0xfc, - 0xe9, 0x80, 0x77, 0x97, 0xd3, 0x44, 0x33, 0x74, 0x04, 0x6e, 0x42, 0x69, 0xe0, 0xf4, 0x9d, 0x81, - 0x1f, 0xed, 0x92, 0x5f, 0x45, 0xe4, 0x81, 0xf1, 0x97, 0x57, 0xcd, 0xe8, 0x31, 0xa5, 0xc5, 0x0d, - 0xd3, 0xe3, 0x4e, 0x5c, 0x36, 0xa2, 0x03, 0xf0, 0x0a, 0x96, 0xc9, 0x37, 0x16, 0xfc, 0x33, 0x0a, - 0xdc, 0xa2, 0xa8, 0x5b, 0x6d, 0x0f, 0xba, 0x84, 0x9e, 0x90, 0x4f, 0x4c, 0xd0, 0x5c, 0x72, 0xa1, - 0x55, 0xe0, 0x1a, 0xc7, 0x76, 0x8b, 0xe3, 0x4a, 0x9e, 0xcd, 0xe8, 0x71, 0x27, 0xf6, 0x45, 0x5d, - 0x9e, 0x74, 0xc1, 0x9b, 0x98, 0xa1, 0xf0, 0x3e, 0x2c, 0xda, 0xb3, 0xd0, 0x08, 0x16, 0x12, 0x4a, - 0x0b, 0x15, 0x38, 0x7d, 0x77, 0xe0, 0x47, 0xe1, 0xb7, 0xda, 0x06, 0x72, 0x9b, 0xe6, 0x25, 0xca, - 0x94, 0x8a, 0x2b, 0x10, 0x5f, 0xc3, 0xf2, 0xdc, 0xac, 0xe8, 0xf0, 0xa7, 0x64, 0xe7, 0x8f, 0x31, - 0xcd, 0x8c, 0xf7, 0xd0, 0x6b, 0x7e, 0x46, 0x04, 0xfe, 0x97, 0x3f, 0x6c, 0xe8, 0x6d, 0x57, 0x32, - 0x1c, 0x5a, 0x05, 0xef, 0xdd, 0xf4, 0x9b, 0x7c, 0x96, 0x62, 0x5b, 0xe1, 0x2d, 0xf0, 0x1b, 0x71, - 0x94, 0x18, 0x9b, 0x72, 0xa5, 0x95, 0x11, 0x77, 0x63, 0x5b, 0x45, 0x8f, 0xe0, 0x9f, 0xd6, 0x37, - 0x44, 0x17, 0xe0, 0x9e, 0x33, 0x8d, 0xd6, 0xe7, 0x8f, 0x6d, 0x30, 0xe1, 0x46, 0xcb, 0x84, 0xd5, - 0xe2, 0xe0, 0xce, 0xc8, 0x79, 0xf6, 0xcc, 0x3a, 0xed, 0x7d, 0x05, 0x00, 0x00, 0xff, 0xff, 0x8d, - 0x6e, 0xee, 0x2b, 0xa5, 0x02, 0x00, 0x00, + // 422 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xb4, 0x93, 0x4f, 0x6f, 0xd3, 0x30, + 0x18, 0xc6, 0xeb, 0xa6, 0x0d, 0xed, 0x9b, 0x56, 0x80, 0x41, 0x28, 0x84, 0x4b, 0x09, 0x02, 0x2a, + 0x0e, 0x69, 0x55, 0x2e, 0xfc, 0x29, 0x20, 0x10, 0x15, 0x45, 0xfc, 0x39, 0x78, 0x9b, 0xb6, 0xd3, + 0xaa, 0x34, 0xb6, 0xb6, 0x68, 0x8d, 0x1d, 0xc5, 0x6e, 0xd7, 0x7e, 0xd8, 0x7d, 0x8b, 0x49, 0xbb, + 0x4e, 0x71, 0xb2, 0x35, 0xeb, 0xb4, 0xa8, 0x97, 0x9d, 0xe2, 0x37, 0x7a, 0x9e, 0x9f, 0xfd, 0xbc, + 0xaf, 0x0d, 0xaf, 0xe2, 0x44, 0x2c, 0x57, 0x3d, 0xca, 0xa4, 0x0a, 0xb9, 0xaf, 0x42, 0xc1, 0x8b, + 0x6b, 0x2f, 0x4e, 0x84, 0x12, 0xf8, 0x79, 0x20, 0x38, 0x9d, 0x87, 0xca, 0xd3, 0x62, 0xaf, 0x20, + 0x70, 0x9e, 0x04, 0x22, 0x8a, 0x04, 0xef, 0x65, 0x9f, 0x4c, 0xef, 0x9e, 0x21, 0x30, 0xf7, 0x62, + 0xea, 0x2b, 0x86, 0xbf, 0x82, 0xe1, 0x53, 0x6a, 0xa3, 0x0e, 0xea, 0x5a, 0x83, 0x77, 0xde, 0x9d, + 0x20, 0x6f, 0x9f, 0x85, 0x47, 0xc7, 0x8a, 0xd1, 0xef, 0x94, 0x26, 0x3b, 0x4c, 0x8d, 0x2b, 0x24, + 0x35, 0xe2, 0x21, 0x98, 0x09, 0x8b, 0xc4, 0x82, 0xd9, 0x55, 0x8d, 0x70, 0x4b, 0x10, 0x6b, 0x6b, + 0xee, 0xc1, 0x7f, 0xa0, 0xc5, 0xc5, 0x84, 0x71, 0x1a, 0x8b, 0x90, 0x2b, 0x69, 0x1b, 0x9a, 0xf1, + 0xa6, 0x84, 0xf1, 0x5f, 0x8c, 0xae, 0xd4, 0xe3, 0x0a, 0xb1, 0xf8, 0xba, 0xfc, 0xd1, 0x00, 0x73, + 0xae, 0x43, 0xb9, 0x9f, 0xe1, 0x41, 0xbe, 0x17, 0xee, 0x43, 0xdd, 0xa7, 0x34, 0x91, 0x36, 0xea, + 0x18, 0x5d, 0x6b, 0xe0, 0x5c, 0xa3, 0xf3, 0x86, 0xec, 0x06, 0x71, 0x2a, 0x65, 0x52, 0x92, 0x4c, + 0xe8, 0x9e, 0x23, 0x78, 0xb8, 0x11, 0x16, 0x7f, 0xb9, 0x49, 0x79, 0xbb, 0x65, 0x9f, 0x72, 0x24, + 0xf6, 0xa1, 0x1d, 0x31, 0x95, 0x84, 0xc1, 0x64, 0xe6, 0x4f, 0xd9, 0x4c, 0xda, 0x55, 0x8d, 0x19, + 0x6e, 0xdf, 0x6e, 0xef, 0x9f, 0xf6, 0xff, 0xd5, 0xf6, 0x11, 0x57, 0xc9, 0x8a, 0xb4, 0xa2, 0xc2, + 0x2f, 0xe7, 0x1b, 0x3c, 0xbe, 0x25, 0xc1, 0x8f, 0xc0, 0x38, 0x61, 0x2b, 0x3d, 0xdc, 0x26, 0x49, + 0x97, 0xf8, 0x29, 0xd4, 0x17, 0xfe, 0x6c, 0x9e, 0x4d, 0xab, 0x49, 0xb2, 0xe2, 0x53, 0xf5, 0x03, + 0x72, 0x2f, 0x10, 0xb4, 0x8a, 0x9b, 0x62, 0x0f, 0x6a, 0xe9, 0xe9, 0xf3, 0xab, 0x51, 0xd6, 0x38, + 0xad, 0xc3, 0xcf, 0xc0, 0x3c, 0xd5, 0x7e, 0x3d, 0xc5, 0x36, 0xc9, 0x2b, 0x7c, 0xb8, 0x19, 0xbe, + 0xa6, 0xc3, 0x7f, 0xdc, 0x32, 0xfc, 0xfd, 0x27, 0x7f, 0x0d, 0x56, 0xe1, 0x56, 0xa5, 0x39, 0xd8, + 0x32, 0x94, 0x4a, 0x6a, 0x77, 0x83, 0xe4, 0xd5, 0xe0, 0x00, 0xac, 0x9f, 0xeb, 0x33, 0xe2, 0xdf, + 0x60, 0xfc, 0x62, 0x0a, 0xbf, 0xd8, 0xec, 0x4b, 0x41, 0xe3, 0xbc, 0x2c, 0xc9, 0x98, 0xbd, 0x3f, + 0xb7, 0xd2, 0x47, 0x53, 0x53, 0xbf, 0xca, 0xf7, 0x97, 0x01, 0x00, 0x00, 0xff, 0xff, 0xbe, 0x93, + 0xce, 0xeb, 0xec, 0x03, 0x00, 0x00, } diff --git a/controller/k8s/pods.go b/controller/k8s/pods.go index d628c11fc..d2cffbc79 100644 --- a/controller/k8s/pods.go +++ b/controller/k8s/pods.go @@ -12,13 +12,21 @@ import ( const podResource = "pods" -type PodIndex struct { +type PodIndex interface { + GetPod(key string) (*v1.Pod, error) + GetPodsByIndex(key string) ([]*v1.Pod, error) + List() ([]*v1.Pod, error) + Run() error + Stop() +} + +type podIndex struct { indexer *cache.Indexer reflector *cache.Reflector stopCh chan struct{} } -func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (*PodIndex, error) { +func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (PodIndex, error) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"index": index}) podListWatcher := cache.NewListWatchFromClient( @@ -37,22 +45,22 @@ func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (*PodIn stopCh := make(chan struct{}) - return &PodIndex{ + return &podIndex{ indexer: &indexer, reflector: reflector, stopCh: stopCh, }, nil } -func (p *PodIndex) Run() error { +func (p *podIndex) Run() error { return newWatcher(p.reflector, podResource, p.reflector.ListAndWatch, p.stopCh).run() } -func (p *PodIndex) Stop() { +func (p *podIndex) Stop() { p.stopCh <- struct{}{} } -func (p *PodIndex) GetPod(key string) (*v1.Pod, error) { +func (p *podIndex) GetPod(key string) (*v1.Pod, error) { item, exists, err := (*p.indexer).GetByKey(key) if err != nil { return nil, err @@ -67,7 +75,7 @@ func (p *PodIndex) GetPod(key string) (*v1.Pod, error) { return pod, nil } -func (p *PodIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { +func (p *podIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { items, err := (*p.indexer).ByIndex("index", key) if err != nil { return nil, err @@ -83,7 +91,7 @@ func (p *PodIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { return pods, nil } -func (p *PodIndex) List() ([]*v1.Pod, error) { +func (p *podIndex) List() ([]*v1.Pod, error) { pods := make([]*v1.Pod, 0) items := (*p.indexer).List() @@ -97,3 +105,15 @@ func (p *PodIndex) List() ([]*v1.Pod, error) { return pods, nil } + +func podIPKeyFunc(obj interface{}) ([]string, error) { + if pod, ok := obj.(*v1.Pod); ok { + return []string{pod.Status.PodIP}, nil + } + return nil, fmt.Errorf("Object is not a Pod") +} + +// NewPodsByIp returns a PodIndex with the Pod's IP as its key. +func NewPodsByIp(clientSet *kubernetes.Clientset) (PodIndex, error) { + return NewPodIndex(clientSet, podIPKeyFunc) +} diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 138daa00a..9204ceb8d 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -28,3 +28,30 @@ func (m *MockEndpointsWatcher) Run() error { } func (m *MockEndpointsWatcher) Stop() {} + +type InMemoryPodIndex struct { + BackingMap map[string]*v1.Pod +} + +func (i *InMemoryPodIndex) GetPod(key string) (*v1.Pod, error) { + return i.BackingMap[key], nil +} + +func (i *InMemoryPodIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { + return []*v1.Pod{i.BackingMap[key]}, nil +} + +func (i *InMemoryPodIndex) List() ([]*v1.Pod, error) { + var pods []*v1.Pod + for _, value := range i.BackingMap { + pods = append(pods, value) + } + + return pods, nil +} +func (i *InMemoryPodIndex) Run() error { return nil } +func (i *InMemoryPodIndex) Stop() {} + +func NewEmptyPodIndex() PodIndex { + return &InMemoryPodIndex{BackingMap: map[string]*v1.Pod{}} +} diff --git a/controller/script/destination-client/main.go b/controller/script/destination-client/main.go index 0e2734cb8..d69270712 100644 --- a/controller/script/destination-client/main.go +++ b/controller/script/destination-client/main.go @@ -19,7 +19,7 @@ import ( func main() { rand.Seed(time.Now().UnixNano()) - addr := flag.String("addr", ":8089", "address of proxy api") + addr := flag.String("addr", ":8089", "address of destination service") path := flag.String("path", "strest-server.default.svc.cluster.local:8888", "destination path") flag.Parse() @@ -51,8 +51,9 @@ func main() { switch updateType := update.Update.(type) { case *pb.Update_Add: log.Println("Add:") + log.Printf("metric_labels: %v", updateType.Add.MetricLabels) for _, addr := range updateType.Add.Addrs { - log.Printf("- %s:%d", util.IPToString(addr.Addr.GetIp()), addr.Addr.Port) + log.Printf("- %s:%d - %v", util.IPToString(addr.Addr.GetIp()), addr.Addr.Port, addr.MetricLabels) } log.Println() case *pb.Update_Remove: diff --git a/controller/tap/server.go b/controller/tap/server.go index 39280b8bf..7ba793c45 100644 --- a/controller/tap/server.go +++ b/controller/tap/server.go @@ -28,7 +28,7 @@ type ( tapPort uint // We use the Kubernetes API to find the IP addresses of pods to tap replicaSets *k8s.ReplicaSetStore - pods *k8s.PodIndex + pods k8s.PodIndex } ) @@ -49,7 +49,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { case *public.TapRequest_Deployment: targetName = target.Deployment var err error - pods, err = (*s.pods).GetPodsByIndex(target.Deployment) + pods, err = s.pods.GetPodsByIndex(target.Deployment) if err != nil { return err } diff --git a/controller/telemetry/server.go b/controller/telemetry/server.go index ffc57af9f..ac40bda59 100644 --- a/controller/telemetry/server.go +++ b/controller/telemetry/server.go @@ -87,19 +87,12 @@ func init() { type ( server struct { prometheusAPI v1.API - pods *k8s.PodIndex + pods k8s.PodIndex replicaSets *k8s.ReplicaSetStore ignoredNamespaces []string } ) -func podIPKeyFunc(obj interface{}) ([]string, error) { - if pod, ok := obj.(*k8sV1.Pod); ok { - return []string{pod.Status.PodIP}, nil - } - return nil, fmt.Errorf("Object is not a Pod") -} - func NewServer(addr, prometheusUrl string, ignoredNamespaces []string, kubeconfig string) (*grpc.Server, net.Listener, error) { prometheusClient, err := api.NewClient(api.Config{Address: prometheusUrl}) if err != nil { @@ -111,7 +104,7 @@ func NewServer(addr, prometheusUrl string, ignoredNamespaces []string, kubeconfi return nil, nil, err } - pods, err := k8s.NewPodIndex(clientSet, podIPKeyFunc) + pods, err := k8s.NewPodsByIp(clientSet) if err != nil { return nil, nil, err } @@ -394,16 +387,6 @@ func (s *server) getDeployment(ip *common.IPAddress) string { return deployment } -func methodString(method *common.HttpMethod) string { - switch method.Type.(type) { - case *common.HttpMethod_Registered_: - return method.GetRegistered().String() - case *common.HttpMethod_Unregistered: - return method.GetUnregistered() - } - return "" -} - func metricToMap(metric model.Metric) map[string]string { labels := make(map[string]string) for k, v := range metric { diff --git a/proto/proxy/destination/destination.proto b/proto/proxy/destination/destination.proto index ae2da111b..36df1a5a3 100644 --- a/proto/proxy/destination/destination.proto +++ b/proto/proxy/destination/destination.proto @@ -75,11 +75,13 @@ message AddrSet { message WeightedAddrSet { repeated WeightedAddr addrs = 1; + map metric_labels = 2; } message WeightedAddr { common.TcpAddress addr = 1; uint32 weight = 3; + map metric_labels = 4; } message NoEndpoints { diff --git a/proxy/tests/support/controller.rs b/proxy/tests/support/controller.rs index 687d5de7a..896a8c235 100644 --- a/proxy/tests/support/controller.rs +++ b/proxy/tests/support/controller.rs @@ -281,8 +281,10 @@ pub fn destination_update(addr: SocketAddr) -> pb::destination::Update { port: u32::from(addr.port()), }), weight: 0, + ..Default::default() }, ], + ..Default::default() }, )), } @@ -293,6 +295,7 @@ pub fn destination_add_none() -> pb::destination::Update { update: Some(pb::destination::update::Update::Add( pb::destination::WeightedAddrSet { addrs: Vec::new(), + ..Default::default() }, )), } @@ -303,6 +306,7 @@ pub fn destination_remove_none() -> pb::destination::Update { update: Some(pb::destination::update::Update::Remove( pb::destination::AddrSet { addrs: Vec::new(), + ..Default::default() }, )), }