mirror of https://github.com/linkerd/linkerd2.git
Add install flag for sending tls identity info to proxies (#1055)
Signed-off-by: Kevin Lingerfelt <kl@buoyant.io>
This commit is contained in:
parent
6ef1204ceb
commit
eebc612d52
|
@ -31,6 +31,7 @@ type installConfig struct {
|
|||
ControllerComponentLabel string
|
||||
CreatedByAnnotation string
|
||||
ProxyAPIPort uint
|
||||
EnableTLS bool
|
||||
}
|
||||
|
||||
type installOptions struct {
|
||||
|
@ -39,6 +40,7 @@ type installOptions struct {
|
|||
webReplicas uint
|
||||
prometheusReplicas uint
|
||||
controllerLogLevel string
|
||||
enableTLS bool
|
||||
*proxyConfigOptions
|
||||
}
|
||||
|
||||
|
@ -49,6 +51,7 @@ func newInstallOptions() *installOptions {
|
|||
webReplicas: 1,
|
||||
prometheusReplicas: 1,
|
||||
controllerLogLevel: "info",
|
||||
enableTLS: false,
|
||||
proxyConfigOptions: newProxyConfigOptions(),
|
||||
}
|
||||
}
|
||||
|
@ -75,6 +78,8 @@ func newCmdInstall() *cobra.Command {
|
|||
cmd.PersistentFlags().UintVar(&options.webReplicas, "web-replicas", options.webReplicas, "Replicas of the web server to deploy")
|
||||
cmd.PersistentFlags().UintVar(&options.prometheusReplicas, "prometheus-replicas", options.prometheusReplicas, "Replicas of prometheus to deploy")
|
||||
cmd.PersistentFlags().StringVar(&options.controllerLogLevel, "controller-log-level", options.controllerLogLevel, "Log level for the controller and web components")
|
||||
cmd.PersistentFlags().BoolVar(&options.enableTLS, "enable-tls", options.enableTLS, "Enable TLS connections among pods in the service mesh")
|
||||
cmd.PersistentFlags().MarkHidden("enable-tls")
|
||||
|
||||
return cmd
|
||||
}
|
||||
|
@ -99,6 +104,7 @@ func validateAndBuildConfig(options *installOptions) (*installConfig, error) {
|
|||
ControllerComponentLabel: k8s.ControllerComponentLabel,
|
||||
CreatedByAnnotation: k8s.CreatedByAnnotation,
|
||||
ProxyAPIPort: options.proxyAPIPort,
|
||||
EnableTLS: options.enableTLS,
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
|
|
@ -36,6 +36,7 @@ func TestRender(t *testing.T) {
|
|||
ControllerComponentLabel: "ControllerComponentLabel",
|
||||
CreatedByAnnotation: "CreatedByAnnotation",
|
||||
ProxyAPIPort: 123,
|
||||
EnableTLS: true,
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
|
|
|
@ -157,6 +157,7 @@ spec:
|
|||
resources: {}
|
||||
- args:
|
||||
- destination
|
||||
- -enable-tls=false
|
||||
- -log-level=info
|
||||
- -logtostderr=true
|
||||
image: gcr.io/runconduit/controller:undefined
|
||||
|
|
|
@ -158,6 +158,7 @@ spec:
|
|||
resources: {}
|
||||
- args:
|
||||
- destination
|
||||
- -enable-tls=true
|
||||
- -log-level=ControllerLogLevel
|
||||
- -logtostderr=true
|
||||
image: ControllerImage
|
||||
|
|
|
@ -162,6 +162,7 @@ spec:
|
|||
imagePullPolicy: {{.ImagePullPolicy}}
|
||||
args:
|
||||
- "destination"
|
||||
- "-enable-tls={{.EnableTLS}}"
|
||||
- "-log-level={{.ControllerLogLevel}}"
|
||||
- "-logtostderr=true"
|
||||
- name: proxy-api
|
||||
|
|
|
@ -18,6 +18,7 @@ func main() {
|
|||
kubeConfigPath := flag.String("kubeconfig", "", "path to kube config")
|
||||
k8sDNSZone := flag.String("kubernetes-dns-zone", "", "The DNS suffix for the local Kubernetes zone.")
|
||||
logLevel := flag.String("log-level", log.InfoLevel.String(), "log level, must be one of: panic, fatal, error, warn, info, debug")
|
||||
enableTLS := flag.Bool("enable-tls", false, "Enable TLS connections among pods in the service mesh")
|
||||
printVersion := version.VersionFlag()
|
||||
flag.Parse()
|
||||
|
||||
|
@ -35,7 +36,7 @@ func main() {
|
|||
|
||||
done := make(chan struct{})
|
||||
|
||||
server, lis, err := destination.NewServer(*addr, *kubeConfigPath, *k8sDNSZone, done)
|
||||
server, lis, err := destination.NewServer(*addr, *kubeConfigPath, *k8sDNSZone, *enableTLS, done)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
|
|
@ -19,9 +19,10 @@ type updateListener interface {
|
|||
|
||||
// implements the updateListener interface
|
||||
type endpointListener struct {
|
||||
stream pb.Destination_GetServer
|
||||
podsByIp k8s.PodIndex
|
||||
labels map[string]string
|
||||
stream pb.Destination_GetServer
|
||||
podsByIp k8s.PodIndex
|
||||
labels map[string]string
|
||||
enableTLS bool
|
||||
}
|
||||
|
||||
func (l *endpointListener) Done() <-chan struct{} {
|
||||
|
@ -86,6 +87,7 @@ func (l *endpointListener) toWeightedAddrSet(endpoints []common.TcpAddress) *pb.
|
|||
}
|
||||
|
||||
func (l *endpointListener) toWeightedAddr(address common.TcpAddress) *pb.WeightedAddr {
|
||||
var tlsIdentity *pb.TlsIdentity
|
||||
metricLabelsForPod := map[string]string{}
|
||||
ipAsString := util.IPToString(address.Ip)
|
||||
|
||||
|
@ -99,6 +101,7 @@ func (l *endpointListener) toWeightedAddr(address common.TcpAddress) *pb.Weighte
|
|||
podFound = true
|
||||
metricLabelsForPod = pkgK8s.GetOwnerLabels(pod.ObjectMeta)
|
||||
metricLabelsForPod["pod"] = pod.Name
|
||||
tlsIdentity = l.toTlsIdentity(pod)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
@ -111,6 +114,7 @@ func (l *endpointListener) toWeightedAddr(address common.TcpAddress) *pb.Weighte
|
|||
Addr: &address,
|
||||
Weight: 1,
|
||||
MetricLabels: metricLabelsForPod,
|
||||
TlsIdentity: tlsIdentity,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -121,3 +125,19 @@ func (l *endpointListener) toAddrSet(endpoints []common.TcpAddress) *pb.AddrSet
|
|||
}
|
||||
return &pb.AddrSet{Addrs: addrs}
|
||||
}
|
||||
|
||||
func (l *endpointListener) toTlsIdentity(pod *coreV1.Pod) *pb.TlsIdentity {
|
||||
if !l.enableTLS {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &pb.TlsIdentity{
|
||||
Strategy: &pb.TlsIdentity_K8SPodNamespace_{
|
||||
K8SPodNamespace: &pb.TlsIdentity_K8SPodNamespace{
|
||||
ControllerNs: pkgK8s.GetControllerNs(pod.ObjectMeta),
|
||||
PodNs: pod.Namespace,
|
||||
PodName: pod.Name,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
|
@ -153,6 +153,93 @@ func TestEndpointListener(t *testing.T) {
|
|||
}
|
||||
})
|
||||
|
||||
t.Run("Sends TlsIdentity when enabled", func(t *testing.T) {
|
||||
expectedPodName := "pod1"
|
||||
expectedPodNamespace := "this-namespace"
|
||||
expectedConduitNamespace := "conduit-namespace"
|
||||
expectedTlsIdentity := &pb.TlsIdentity_K8SPodNamespace{
|
||||
PodName: expectedPodName,
|
||||
PodNs: expectedPodNamespace,
|
||||
ControllerNs: expectedConduitNamespace,
|
||||
}
|
||||
|
||||
addedAddress := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1}
|
||||
ipForAddr := util.IPToString(addedAddress.Ip)
|
||||
podForAddedAddress := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: expectedPodName,
|
||||
Namespace: expectedPodNamespace,
|
||||
Labels: map[string]string{
|
||||
pkgK8s.ControllerNSLabel: expectedConduitNamespace,
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
},
|
||||
}
|
||||
|
||||
podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr: []*v1.Pod{podForAddedAddress}}}
|
||||
|
||||
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}}
|
||||
listener := &endpointListener{
|
||||
podsByIp: podIndex,
|
||||
stream: mockGetServer,
|
||||
enableTLS: true,
|
||||
}
|
||||
|
||||
listener.Update([]common.TcpAddress{addedAddress}, nil)
|
||||
|
||||
addrs := mockGetServer.updatesReceived[0].GetAdd().GetAddrs()
|
||||
if len(addrs) != 1 {
|
||||
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
||||
}
|
||||
|
||||
actualTlsIdentity := addrs[0].GetTlsIdentity().GetK8SPodNamespace()
|
||||
if !reflect.DeepEqual(actualTlsIdentity, expectedTlsIdentity) {
|
||||
t.Fatalf("Expected TlsIdentity to be [%v] but was [%v]", expectedTlsIdentity, actualTlsIdentity)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("Does not sent TlsIdentity when not enabled", func(t *testing.T) {
|
||||
expectedPodName := "pod1"
|
||||
expectedPodNamespace := "this-namespace"
|
||||
expectedConduitNamespace := "conduit-namespace"
|
||||
|
||||
addedAddress := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 666}}, Port: 1}
|
||||
ipForAddr := util.IPToString(addedAddress.Ip)
|
||||
podForAddedAddress := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: expectedPodName,
|
||||
Namespace: expectedPodNamespace,
|
||||
Labels: map[string]string{
|
||||
pkgK8s.ControllerNSLabel: expectedConduitNamespace,
|
||||
},
|
||||
},
|
||||
Status: v1.PodStatus{
|
||||
Phase: v1.PodRunning,
|
||||
},
|
||||
}
|
||||
|
||||
podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr: []*v1.Pod{podForAddedAddress}}}
|
||||
|
||||
mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}}
|
||||
listener := &endpointListener{
|
||||
podsByIp: podIndex,
|
||||
stream: mockGetServer,
|
||||
}
|
||||
|
||||
listener.Update([]common.TcpAddress{addedAddress}, nil)
|
||||
|
||||
addrs := mockGetServer.updatesReceived[0].GetAdd().GetAddrs()
|
||||
if len(addrs) != 1 {
|
||||
t.Fatalf("Expected [1] address returned, got %v", addrs)
|
||||
}
|
||||
|
||||
if addrs[0].TlsIdentity != nil {
|
||||
t.Fatalf("Expected no TlsIdentity to be sent, but got [%v]", addrs[0].TlsIdentity)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("It only returns pods in a running state", func(t *testing.T) {
|
||||
expectations := []listenerExpected{
|
||||
listenerExpected{
|
||||
|
|
|
@ -17,6 +17,7 @@ import (
|
|||
type server struct {
|
||||
podsByIp k8s.PodIndex
|
||||
resolvers []streamingDestinationResolver
|
||||
enableTLS bool
|
||||
}
|
||||
|
||||
// The Destination service serves service discovery information to the proxy.
|
||||
|
@ -29,7 +30,7 @@ type server struct {
|
|||
//
|
||||
// Addresses for the given destination are fetched from the Kubernetes Endpoints
|
||||
// API.
|
||||
func NewServer(addr, kubeconfig string, k8sDNSZone string, done chan struct{}) (*grpc.Server, net.Listener, error) {
|
||||
func NewServer(addr, kubeconfig, k8sDNSZone string, enableTLS bool, done chan struct{}) (*grpc.Server, net.Listener, error) {
|
||||
clientSet, err := k8s.NewClientSet(kubeconfig)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
|
@ -58,6 +59,7 @@ func NewServer(addr, kubeconfig string, k8sDNSZone string, done chan struct{}) (
|
|||
srv := server{
|
||||
podsByIp: podsByIp,
|
||||
resolvers: resolvers,
|
||||
enableTLS: enableTLS,
|
||||
}
|
||||
|
||||
lis, err := net.Listen("tcp", addr)
|
||||
|
@ -105,7 +107,7 @@ func (s *server) Get(dest *common.Destination, stream pb.Destination_GetServer)
|
|||
}
|
||||
|
||||
func (s *server) streamResolutionUsingCorrectResolverFor(host string, port int, stream pb.Destination_GetServer) error {
|
||||
listener := &endpointListener{stream: stream, podsByIp: s.podsByIp}
|
||||
listener := &endpointListener{stream: stream, podsByIp: s.podsByIp, enableTLS: s.enableTLS}
|
||||
|
||||
for _, resolver := range s.resolvers {
|
||||
resolverCanResolve, err := resolver.canResolve(host, port)
|
||||
|
|
|
@ -51,9 +51,14 @@ func main() {
|
|||
switch updateType := update.Update.(type) {
|
||||
case *pb.Update_Add:
|
||||
log.Println("Add:")
|
||||
log.Printf("metric_labels: %v", updateType.Add.MetricLabels)
|
||||
log.Printf("labels: %v", updateType.Add.MetricLabels)
|
||||
for _, addr := range updateType.Add.Addrs {
|
||||
log.Printf("- %s:%d - %v", util.IPToString(addr.Addr.GetIp()), addr.Addr.Port, addr.MetricLabels)
|
||||
log.Printf("- %s:%d", util.IPToString(addr.Addr.GetIp()), addr.Addr.Port)
|
||||
log.Printf(" - labels: %v", addr.MetricLabels)
|
||||
switch identityType := addr.GetTlsIdentity().GetStrategy().(type) {
|
||||
case *pb.TlsIdentity_K8SPodNamespace_:
|
||||
log.Printf(" - tls: %v", identityType.K8SPodNamespace)
|
||||
}
|
||||
}
|
||||
log.Println()
|
||||
case *pb.Update_Remove:
|
||||
|
|
|
@ -94,6 +94,10 @@ func GetOwnerLabels(objectMeta meta.ObjectMeta) map[string]string {
|
|||
return labels
|
||||
}
|
||||
|
||||
func GetControllerNs(objectMeta meta.ObjectMeta) string {
|
||||
return objectMeta.Labels[ControllerNSLabel]
|
||||
}
|
||||
|
||||
// toOwnerLabel converts a proxy label to a prometheus label, following the
|
||||
// relabel conventions from the prometheus scrape config file
|
||||
func toOwnerLabel(proxyLabel string) string {
|
||||
|
|
Loading…
Reference in New Issue