diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 59ce8a62b..c9fd2d802 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -7,6 +7,7 @@ import ( "syscall" "github.com/runconduit/conduit/controller/destination" + "github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/util" "github.com/runconduit/conduit/pkg/version" log "github.com/sirupsen/logrus" @@ -34,13 +35,26 @@ func main() { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + k8sClient, err := k8s.NewClientSet(*kubeConfigPath) + if err != nil { + log.Fatal(err.Error()) + } + k8sAPI := k8s.NewAPI(k8sClient) + done := make(chan struct{}) - server, lis, err := destination.NewServer(*addr, *kubeConfigPath, *k8sDNSZone, *enableTLS, done) + server, lis, err := destination.NewServer(*addr, *kubeConfigPath, *k8sDNSZone, *enableTLS, k8sAPI, done) if err != nil { log.Fatal(err) } + go func() { + err := k8sAPI.Sync() + if err != nil { + log.Fatal(err.Error()) + } + }() + go func() { log.Infof("starting gRPC server on %s", *addr) server.Serve(lis) diff --git a/controller/destination/listener.go b/controller/destination/listener.go index c8b7d0f0f..5f90d9821 100644 --- a/controller/destination/listener.go +++ b/controller/destination/listener.go @@ -3,7 +3,6 @@ package destination import ( common "github.com/runconduit/conduit/controller/gen/common" pb "github.com/runconduit/conduit/controller/gen/proxy/destination" - "github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/util" pkgK8s "github.com/runconduit/conduit/pkg/k8s" log "github.com/sirupsen/logrus" @@ -20,7 +19,7 @@ type updateListener interface { // implements the updateListener interface type endpointListener struct { stream pb.Destination_GetServer - podsByIp k8s.PodIndex + podsByIp func(string) ([]*coreV1.Pod, error) labels map[string]string enableTLS bool } @@ -91,7 +90,7 @@ func (l *endpointListener) toWeightedAddr(address common.TcpAddress) *pb.Weighte metricLabelsForPod := map[string]string{} ipAsString := util.IPToString(address.Ip) - resultingPods, err := l.podsByIp.GetPodsByIndex(ipAsString) + resultingPods, err := l.podsByIp(ipAsString) if err != nil { log.Errorf("Error while finding pod for IP [%s], this IP will be sent with no metric labels: %v", ipAsString, err) } else { diff --git a/controller/destination/listener_test.go b/controller/destination/listener_test.go index b63beb753..a664fd2b1 100644 --- a/controller/destination/listener_test.go +++ b/controller/destination/listener_test.go @@ -7,7 +7,6 @@ import ( common "github.com/runconduit/conduit/controller/gen/common" pb "github.com/runconduit/conduit/controller/gen/proxy/destination" - "github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/util" pkgK8s "github.com/runconduit/conduit/pkg/k8s" "k8s.io/api/core/v1" @@ -28,11 +27,15 @@ type listenerExpected struct { addressLabels map[string]string } +func noPodsByIp(ip string) ([]*v1.Pod, error) { + return make([]*v1.Pod, 0), nil +} + func TestEndpointListener(t *testing.T) { t.Run("Sends one update for add and another for remove", func(t *testing.T) { mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} - listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + listener := &endpointListener{stream: mockGetServer, podsByIp: noPodsByIp} addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 1}}, Port: 1} addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 2}}, Port: 2} @@ -50,7 +53,7 @@ func TestEndpointListener(t *testing.T) { t.Run("Sends addresses as removed or added", func(t *testing.T) { mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} - listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + listener := &endpointListener{stream: mockGetServer, podsByIp: noPodsByIp} addedAddress1 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 1}}, Port: 1} addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 2}}, Port: 2} @@ -85,7 +88,7 @@ func TestEndpointListener(t *testing.T) { t.Run("It returns when the underlying context is done", func(t *testing.T) { context, cancelFn := context.WithCancel(context.Background()) mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}, contextToReturn: context} - listener := &endpointListener{stream: mockGetServer, podsByIp: k8s.NewEmptyPodIndex()} + listener := &endpointListener{stream: mockGetServer, podsByIp: noPodsByIp} completed := make(chan bool) go func() { @@ -123,7 +126,9 @@ func TestEndpointListener(t *testing.T) { }, } addedAddress2 := common.TcpAddress{Ip: &common.IPAddress{Ip: &common.IPAddress_Ipv4{Ipv4: 222}}, Port: 22} - podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr1: []*v1.Pod{podForAddedAddress1}}} + podIndex := func(ip string) ([]*v1.Pod, error) { + return map[string][]*v1.Pod{ipForAddr1: []*v1.Pod{podForAddedAddress1}}[ip], nil + } mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} listener := &endpointListener{ @@ -178,7 +183,9 @@ func TestEndpointListener(t *testing.T) { }, } - podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr: []*v1.Pod{podForAddedAddress}}} + podIndex := func(ip string) ([]*v1.Pod, error) { + return map[string][]*v1.Pod{ipForAddr: []*v1.Pod{podForAddedAddress}}[ip], nil + } mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} listener := &endpointListener{ @@ -220,7 +227,9 @@ func TestEndpointListener(t *testing.T) { }, } - podIndex := &k8s.InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{ipForAddr: []*v1.Pod{podForAddedAddress}}} + podIndex := func(ip string) ([]*v1.Pod, error) { + return map[string][]*v1.Pod{ipForAddr: []*v1.Pod{podForAddedAddress}}[ip], nil + } mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} listener := &endpointListener{ @@ -311,7 +320,9 @@ func TestEndpointListener(t *testing.T) { backingMap[ipForAddr] = append(backingMap[ipForAddr], podForAddedAddress) } - podIndex := &k8s.InMemoryPodIndex{BackingMap: backingMap} + podIndex := func(ip string) ([]*v1.Pod, error) { + return backingMap[ip], nil + } mockGetServer := &mockDestination_GetServer{updatesReceived: []*pb.Update{}} listener := &endpointListener{ diff --git a/controller/destination/server.go b/controller/destination/server.go index 04c03986c..f448f4632 100644 --- a/controller/destination/server.go +++ b/controller/destination/server.go @@ -12,10 +12,14 @@ import ( "github.com/runconduit/conduit/controller/util" log "github.com/sirupsen/logrus" "google.golang.org/grpc" + "k8s.io/api/core/v1" + "k8s.io/client-go/tools/cache" ) +const podIpIndexName = "ip" + type server struct { - podsByIp k8s.PodIndex + k8sAPI *k8s.API resolvers []streamingDestinationResolver enableTLS bool } @@ -30,20 +34,13 @@ type server struct { // // Addresses for the given destination are fetched from the Kubernetes Endpoints // API. -func NewServer(addr, kubeconfig, k8sDNSZone string, enableTLS bool, done chan struct{}) (*grpc.Server, net.Listener, error) { +func NewServer(addr, kubeconfig, k8sDNSZone string, enableTLS bool, k8sAPI *k8s.API, done chan struct{}) (*grpc.Server, net.Listener, error) { clientSet, err := k8s.NewClientSet(kubeconfig) if err != nil { return nil, nil, err } - podsByIp, err := k8s.NewPodsByIp(clientSet) - if err != nil { - return nil, nil, err - } - err = podsByIp.Run() - if err != nil { - return nil, nil, err - } + k8sAPI.Pod.Informer().AddIndexers(cache.Indexers{podIpIndexName: indexPodByIp}) endpointsWatcher := k8s.NewEndpointsWatcher(clientSet) err = endpointsWatcher.Run() @@ -57,7 +54,7 @@ func NewServer(addr, kubeconfig, k8sDNSZone string, enableTLS bool, done chan st } srv := server{ - podsByIp: podsByIp, + k8sAPI: k8sAPI, resolvers: resolvers, enableTLS: enableTLS, } @@ -106,6 +103,29 @@ func (s *server) Get(dest *common.Destination, stream pb.Destination_GetServer) return s.streamResolutionUsingCorrectResolverFor(host, port, stream) } +func indexPodByIp(obj interface{}) ([]string, error) { + if pod, ok := obj.(*v1.Pod); ok { + return []string{pod.Status.PodIP}, nil + } + return []string{""}, fmt.Errorf("object is not a pod") +} + +func (s *server) podsByIp(ip string) ([]*v1.Pod, error) { + objs, err := s.k8sAPI.Pod.Informer().GetIndexer().ByIndex(podIpIndexName, ip) + if err != nil { + return nil, err + } + pods := make([]*v1.Pod, 0) + for _, obj := range objs { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("not a pod") + } + pods = append(pods, pod) + } + return pods, nil +} + func (s *server) streamResolutionUsingCorrectResolverFor(host string, port int, stream pb.Destination_GetServer) error { listener := &endpointListener{stream: stream, podsByIp: s.podsByIp, enableTLS: s.enableTLS} diff --git a/controller/destination/server_test.go b/controller/destination/server_test.go index 69320e68f..4087fa44b 100644 --- a/controller/destination/server_test.go +++ b/controller/destination/server_test.go @@ -84,6 +84,10 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { stream := &mockDestination_GetServer{} host := "something" port := 666 + k8sAPI, err := k8s.NewFakeAPI() + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } t.Run("Uses first resolver that is able to resolve the host and port", func(t *testing.T) { no := &mockStreamingDestinationResolver{canResolveToReturn: false} @@ -91,7 +95,7 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { otherYes := &mockStreamingDestinationResolver{canResolveToReturn: true} server := server{ - podsByIp: k8s.NewEmptyPodIndex(), + k8sAPI: k8sAPI, resolvers: []streamingDestinationResolver{no, no, yes, no, no, otherYes}, } @@ -117,7 +121,7 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { no := &mockStreamingDestinationResolver{canResolveToReturn: false} server := server{ - podsByIp: k8s.NewEmptyPodIndex(), + k8sAPI: k8sAPI, resolvers: []streamingDestinationResolver{no, no, no, no}, } @@ -131,7 +135,7 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { resolver := &mockStreamingDestinationResolver{canResolveToReturn: true, errToReturnForCanResolve: errors.New("expected for can resolve")} server := server{ - podsByIp: k8s.NewEmptyPodIndex(), + k8sAPI: k8sAPI, resolvers: []streamingDestinationResolver{resolver}, } @@ -145,7 +149,7 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { resolver := &mockStreamingDestinationResolver{canResolveToReturn: true, errToReturnForResolution: errors.New("expected for resolving")} server := server{ - podsByIp: k8s.NewEmptyPodIndex(), + k8sAPI: k8sAPI, resolvers: []streamingDestinationResolver{resolver}, } diff --git a/controller/k8s/pods.go b/controller/k8s/pods.go deleted file mode 100644 index 46c48f4aa..000000000 --- a/controller/k8s/pods.go +++ /dev/null @@ -1,119 +0,0 @@ -package k8s - -import ( - "fmt" - "time" - - "k8s.io/api/core/v1" - "k8s.io/apimachinery/pkg/fields" - "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/cache" -) - -const podResource = "pods" - -type PodIndex interface { - GetPod(key string) (*v1.Pod, error) - GetPodsByIndex(key string) ([]*v1.Pod, error) - List() ([]*v1.Pod, error) - Run() error - Stop() -} - -type podIndex struct { - indexer *cache.Indexer - reflector *cache.Reflector - stopCh chan struct{} -} - -func NewPodIndex(clientset kubernetes.Interface, index cache.IndexFunc) (PodIndex, error) { - indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"index": index}) - - podListWatcher := cache.NewListWatchFromClient( - clientset.CoreV1().RESTClient(), - podResource, - v1.NamespaceAll, - fields.Everything(), - ) - - reflector := cache.NewReflector( - podListWatcher, - &v1.Pod{}, - indexer, - time.Duration(0), - ) - - stopCh := make(chan struct{}) - - return &podIndex{ - indexer: &indexer, - reflector: reflector, - stopCh: stopCh, - }, nil -} - -func (p *podIndex) Run() error { - return newWatcher(p.reflector, podResource, p.reflector.ListAndWatch, p.stopCh).run() -} - -func (p *podIndex) Stop() { - p.stopCh <- struct{}{} -} - -func (p *podIndex) GetPod(key string) (*v1.Pod, error) { - item, exists, err := (*p.indexer).GetByKey(key) - if err != nil { - return nil, err - } - if !exists { - return nil, fmt.Errorf("no pod exists for key %s", key) - } - pod, ok := item.(*v1.Pod) - if !ok { - return nil, fmt.Errorf("%v is not a Pod", item) - } - return pod, nil -} - -func (p *podIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { - items, err := (*p.indexer).ByIndex("index", key) - if err != nil { - return nil, err - } - pods := make([]*v1.Pod, len(items)) - for i, item := range items { - pod, ok := item.(*v1.Pod) - if !ok { - return nil, fmt.Errorf("%v is not a Pod", item) - } - pods[i] = pod - } - return pods, nil -} - -func (p *podIndex) List() ([]*v1.Pod, error) { - pods := make([]*v1.Pod, 0) - - items := (*p.indexer).List() - for _, pod := range items { - pod, ok := pod.(*v1.Pod) - if !ok { - return nil, fmt.Errorf("%v is not a Pod", pod) - } - pods = append(pods, pod) - } - - return pods, nil -} - -func podIPKeyFunc(obj interface{}) ([]string, error) { - if pod, ok := obj.(*v1.Pod); ok { - return []string{pod.Status.PodIP}, nil - } - return nil, fmt.Errorf("Object is not a Pod") -} - -// NewPodsByIp returns a PodIndex with the Pod's IP as its key. -func NewPodsByIp(clientSet *kubernetes.Clientset) (PodIndex, error) { - return NewPodIndex(clientSet, podIPKeyFunc) -} diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index 77b0a7dfe..909569087 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -34,35 +34,6 @@ func (m *MockEndpointsWatcher) Run() error { func (m *MockEndpointsWatcher) Stop() {} -type InMemoryPodIndex struct { - BackingMap map[string][]*v1.Pod -} - -func (i *InMemoryPodIndex) GetPod(key string) (*v1.Pod, error) { - return i.BackingMap[key][0], nil -} - -func (i *InMemoryPodIndex) GetPodsByIndex(key string) ([]*v1.Pod, error) { - return i.BackingMap[key], nil -} - -func (i *InMemoryPodIndex) List() ([]*v1.Pod, error) { - var pods []*v1.Pod - for _, byIndex := range i.BackingMap { - for _, pod := range byIndex { - pods = append(pods, pod) - } - } - - return pods, nil -} -func (i *InMemoryPodIndex) Run() error { return nil } -func (i *InMemoryPodIndex) Stop() {} - -func NewEmptyPodIndex() PodIndex { - return &InMemoryPodIndex{BackingMap: map[string][]*v1.Pod{}} -} - func toRuntimeObject(config string) (runtime.Object, error) { decode := scheme.Codecs.UniversalDeserializer().Decode obj, _, err := decode([]byte(config), nil, nil)