diff --git a/Gopkg.lock b/Gopkg.lock index 071dd8793..76aa33774 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -746,6 +746,6 @@ [solve-meta] analyzer-name = "dep" analyzer-version = 1 - inputs-digest = "f5ff9662192d238efdb260226a886088d05da0924292833660466258fd80c8c0" + inputs-digest = "325d0175a74db86d1fed482bddc8279459d63be9d5338d9a420722c24eea5534" solver-name = "gps-cdcl" solver-version = 1 diff --git a/cli/Dockerfile-bin b/cli/Dockerfile-bin index 4f9f164cc..781f845af 100644 --- a/cli/Dockerfile-bin +++ b/cli/Dockerfile-bin @@ -1,5 +1,5 @@ ## compile binaries -FROM gcr.io/runconduit/go-deps:d17b1119 as golang +FROM gcr.io/runconduit/go-deps:a6e8221d as golang ARG CONDUIT_VERSION WORKDIR /go/src/github.com/runconduit/conduit COPY cli cli diff --git a/controller/Dockerfile b/controller/Dockerfile index 4aee44078..4c4188020 100644 --- a/controller/Dockerfile +++ b/controller/Dockerfile @@ -1,5 +1,5 @@ ## compile controller services -FROM gcr.io/runconduit/go-deps:d17b1119 as golang +FROM gcr.io/runconduit/go-deps:a6e8221d as golang ARG CONDUIT_VERSION WORKDIR /go/src/github.com/runconduit/conduit COPY controller/gen controller/gen diff --git a/controller/api/public/client.go b/controller/api/public/client.go index 6cb86f395..982b3401d 100644 --- a/controller/api/public/client.go +++ b/controller/api/public/client.go @@ -77,7 +77,24 @@ func (c *grpcOverHttpClient) Tap(ctx context.Context, req *pb.TapRequest, _ ...g } func (c *grpcOverHttpClient) TapByResource(ctx context.Context, req *pb.TapByResourceRequest, _ ...grpc.CallOption) (pb.Api_TapByResourceClient, error) { - return nil, fmt.Errorf("Unimplemented") + url := c.endpointNameToPublicApiUrl("TapByResource") + httpRsp, err := c.post(ctx, url, req) + if err != nil { + return nil, err + } + + if err = checkIfResponseHasConduitError(httpRsp); err != nil { + httpRsp.Body.Close() + return nil, err + } + + go func() { + <-ctx.Done() + log.Debug("Closing response body after context marked as done") + httpRsp.Body.Close() + }() + + return &tapClient{ctx: ctx, reader: bufio.NewReader(httpRsp.Body)}, nil } func (c *grpcOverHttpClient) apiRequest(ctx context.Context, endpoint string, req proto.Message, protoResponse proto.Message) error { diff --git a/controller/api/public/grpc_server_test.go b/controller/api/public/grpc_server_test.go index aefdb7103..c6bfaf2ed 100644 --- a/controller/api/public/grpc_server_test.go +++ b/controller/api/public/grpc_server_test.go @@ -194,6 +194,7 @@ spec: replicaSetInformer.Informer().HasSynced, podInformer.Informer().HasSynced, replicationControllerInformer.Informer().HasSynced, + serviceInformer.Informer().HasSynced, ) { t.Fatalf("timed out wait for caches to sync") } diff --git a/controller/api/public/http_server.go b/controller/api/public/http_server.go index 96b2fea80..a35ca1224 100644 --- a/controller/api/public/http_server.go +++ b/controller/api/public/http_server.go @@ -19,11 +19,12 @@ import ( ) var ( - statSummaryPath = fullUrlPathFor("StatSummary") - versionPath = fullUrlPathFor("Version") - listPodsPath = fullUrlPathFor("ListPods") - tapPath = fullUrlPathFor("Tap") - selfCheckPath = fullUrlPathFor("SelfCheck") + statSummaryPath = fullUrlPathFor("StatSummary") + versionPath = fullUrlPathFor("Version") + listPodsPath = fullUrlPathFor("ListPods") + tapPath = fullUrlPathFor("Tap") + tapByResourcePath = fullUrlPathFor("TapByResource") + selfCheckPath = fullUrlPathFor("SelfCheck") ) type handler struct { @@ -50,6 +51,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) { h.handleListPods(w, req) case tapPath: h.handleTap(w, req) + case tapByResourcePath: + h.handleTapByResource(w, req) case selfCheckPath: h.handleSelfCheck(w, req) default: @@ -164,6 +167,28 @@ func (h *handler) handleTap(w http.ResponseWriter, req *http.Request) { } } +func (h *handler) handleTapByResource(w http.ResponseWriter, req *http.Request) { + flushableWriter, err := newStreamingWriter(w) + if err != nil { + writeErrorToHttpResponse(w, err) + return + } + + var protoRequest pb.TapByResourceRequest + err = httpRequestToProto(req, &protoRequest) + if err != nil { + writeErrorToHttpResponse(w, err) + return + } + + server := tapServer{w: flushableWriter, req: req} + err = h.grpcServer.TapByResource(&protoRequest, server) + if err != nil { + writeErrorToHttpResponse(w, err) + return + } +} + type tapServer struct { w flushableResponseWriter req *http.Request diff --git a/controller/api/public/http_server_test.go b/controller/api/public/http_server_test.go index eb9f878ed..c7b4a12de 100644 --- a/controller/api/public/http_server_test.go +++ b/controller/api/public/http_server_test.go @@ -181,7 +181,7 @@ func TestServer(t *testing.T) { mockGrpcServer.TapStreamsToReturn = expectedTapResponses mockGrpcServer.ErrorToReturn = nil - tapClient, err := client.Tap(context.TODO(), &pb.TapRequest{}) + tapClient, err := client.TapByResource(context.TODO(), &pb.TapByResourceRequest{}) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/controller/cmd/tap/main.go b/controller/cmd/tap/main.go index 3354b20f5..5d65e1f9b 100644 --- a/controller/cmd/tap/main.go +++ b/controller/cmd/tap/main.go @@ -1,15 +1,22 @@ package main import ( + "context" "flag" + "fmt" "os" "os/signal" "syscall" + "time" + "github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/tap" "github.com/runconduit/conduit/controller/util" "github.com/runconduit/conduit/pkg/version" log "github.com/sirupsen/logrus" + "k8s.io/api/core/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" ) func main() { @@ -33,11 +40,98 @@ func main() { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - server, lis, err := tap.NewServer(*addr, *tapPort, *kubeConfigPath) + clientSet, err := k8s.NewClientSet(*kubeConfigPath) + if err != nil { + log.Fatalf("failed to create Kubernetes client: %s", err) + } + + replicaSets, err := k8s.NewReplicaSetStore(clientSet) + if err != nil { + log.Fatalf("NewReplicaSetStore failed: %s", err) + } + err = replicaSets.Run() + if err != nil { + log.Fatalf("replicaSets.Run() failed: %s", err) + } + + // index pods by deployment + deploymentIndex := func(obj interface{}) ([]string, error) { + pod, ok := obj.(*v1.Pod) + if !ok { + return nil, fmt.Errorf("object is not a Pod") + } + deployment, err := replicaSets.GetDeploymentForPod(pod) + if err != nil { + log.Debugf("Cannot get deployment for pod %s: %s", pod.Name, err) + return []string{}, nil + } + return []string{deployment}, nil + } + + pods, err := k8s.NewPodIndex(clientSet, deploymentIndex) + if err != nil { + log.Fatalf("NewPodIndex failed: %s", err) + } + err = pods.Run() + if err != nil { + log.Fatalf("pods.Run() failed: %s", err) + } + + // TODO: factor out with public-api + sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute) + + namespaceInformer := sharedInformers.Core().V1().Namespaces() + namespaceInformerSynced := namespaceInformer.Informer().HasSynced + + deployInformer := sharedInformers.Apps().V1beta2().Deployments() + deployInformerSynced := deployInformer.Informer().HasSynced + + replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets() + replicaSetInformerSynced := replicaSetInformer.Informer().HasSynced + + podInformer := sharedInformers.Core().V1().Pods() + podInformerSynced := podInformer.Informer().HasSynced + + replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers() + replicationControllerInformerSynced := replicationControllerInformer.Informer().HasSynced + + serviceInformer := sharedInformers.Core().V1().Services() + serviceInformerSynced := serviceInformer.Informer().HasSynced + + sharedInformers.Start(nil) + + server, lis, err := tap.NewServer( + *addr, *tapPort, replicaSets, pods, + namespaceInformer.Lister(), + deployInformer.Lister(), + replicaSetInformer.Lister(), + podInformer.Lister(), + replicationControllerInformer.Lister(), + serviceInformer.Lister(), + ) if err != nil { log.Fatal(err.Error()) } + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + log.Infof("waiting for caches to sync") + if !cache.WaitForCacheSync( + ctx.Done(), + namespaceInformerSynced, + deployInformerSynced, + replicaSetInformerSynced, + podInformerSynced, + replicationControllerInformerSynced, + serviceInformerSynced, + ) { + log.Fatalf("timed out wait for caches to sync") + } + log.Infof("caches synced") + }() + go func() { log.Println("starting gRPC server on", *addr) server.Serve(lis) diff --git a/controller/k8s/pods.go b/controller/k8s/pods.go index d2cffbc79..46c48f4aa 100644 --- a/controller/k8s/pods.go +++ b/controller/k8s/pods.go @@ -26,7 +26,7 @@ type podIndex struct { stopCh chan struct{} } -func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (PodIndex, error) { +func NewPodIndex(clientset kubernetes.Interface, index cache.IndexFunc) (PodIndex, error) { indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"index": index}) podListWatcher := cache.NewListWatchFromClient( diff --git a/controller/k8s/replicasets.go b/controller/k8s/replicasets.go index fae86efd9..90f1d911e 100644 --- a/controller/k8s/replicasets.go +++ b/controller/k8s/replicasets.go @@ -19,7 +19,7 @@ type ReplicaSetStore struct { stopCh chan struct{} } -func NewReplicaSetStore(clientset *kubernetes.Clientset) (*ReplicaSetStore, error) { +func NewReplicaSetStore(clientset kubernetes.Interface) (*ReplicaSetStore, error) { store := cache.NewStore(cache.MetaNamespaceKeyFunc) replicatSetListWatcher := cache.NewListWatchFromClient( diff --git a/controller/tap/server.go b/controller/tap/server.go index be71a852e..a59051caa 100644 --- a/controller/tap/server.go +++ b/controller/tap/server.go @@ -14,21 +14,49 @@ import ( public "github.com/runconduit/conduit/controller/gen/public" "github.com/runconduit/conduit/controller/k8s" "github.com/runconduit/conduit/controller/util" + pkgK8s "github.com/runconduit/conduit/pkg/k8s" log "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "k8s.io/api/core/v1" + appsv1beta2 "k8s.io/api/apps/v1beta2" + apiv1 "k8s.io/api/core/v1" + k8sErrors "k8s.io/apimachinery/pkg/api/errors" + metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + applisters "k8s.io/client-go/listers/apps/v1beta2" + corelisters "k8s.io/client-go/listers/core/v1" ) -var tapInterval = 10 * time.Second - type ( server struct { tapPort uint // We use the Kubernetes API to find the IP addresses of pods to tap + // TODO: remove these when TapByResource replaces tap replicaSets *k8s.ReplicaSetStore pods k8s.PodIndex + + // TODO: factor out with public-api + namespaceLister corelisters.NamespaceLister + deployLister applisters.DeploymentLister + replicaSetLister applisters.ReplicaSetLister + podLister corelisters.PodLister + replicationControllerLister corelisters.ReplicationControllerLister + serviceLister corelisters.ServiceLister + } +) + +var ( + tapInterval = 10 * time.Second + + // TODO: factor out with public-api + k8sResourceTypesToDestinationLabels = map[string]string{ + pkgK8s.KubernetesDeployments: "deployment", + pkgK8s.KubernetesNamespaces: "namespace", + pkgK8s.KubernetesPods: "pod", + pkgK8s.KubernetesReplicationControllers: "replication_controller", + pkgK8s.KubernetesServices: "service", } ) @@ -36,7 +64,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { // TODO: Allow a configurable aperture A. // If the target contains more than A pods, select A of them at random. - var pods []*v1.Pod + var pods []*apiv1.Pod var targetName string switch target := req.Target.(type) { case *public.TapRequest_Pod: @@ -45,7 +73,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { if err != nil { return status.Errorf(codes.NotFound, err.Error()) } - pods = []*v1.Pod{pod} + pods = []*apiv1.Pod{pod} case *public.TapRequest_Deployment: targetName = target.Deployment var err error @@ -55,7 +83,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { } } - log.Printf("Tapping %d pods for target %s", len(pods), targetName) + log.Infof("Tapping %d pods for target %s", len(pods), targetName) events := make(chan *common.TapEvent) @@ -74,7 +102,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { // initiate a tap on the pod match, err := makeMatch(req) if err != nil { - return nil + return err } go s.tapProxy(stream.Context(), rpsPerPod, match, pod.Status.PodIP, events) } @@ -90,12 +118,73 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error { } func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_TapByResourceServer) error { - return fmt.Errorf("unimplemented") + if req == nil { + return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest") + } + if req.Target == nil { + return status.Errorf(codes.InvalidArgument, "TapByResource received nil target ResourceSelection: %+v", *req) + } + + pods, err := s.getPodsFor(*req.Target) + if err != nil { + if status.Code(err) == codes.Unknown { + if k8sErrors.ReasonForError(err) == metaV1.StatusReasonNotFound { + err = status.Errorf(codes.NotFound, err.Error()) + } else { + err = status.Errorf(codes.Internal, err.Error()) + } + } + return err + } + + if len(pods) == 0 { + return status.Errorf(codes.NotFound, "no pods found for ResourceSelection: %+v", *req.Target) + } + + log.Infof("Tapping %d pods for target: %+v", len(pods), *req.Target.Resource) + + events := make(chan *common.TapEvent) + + go func() { // Stop sending back events if the request is cancelled + <-stream.Context().Done() + close(events) + }() + + // divide the rps evenly between all pods to tap + rpsPerPod := req.MaxRps / float32(len(pods)) + if rpsPerPod < 1 { + rpsPerPod = 1 + } + + match, err := makeByResourceMatch(req.Match) + if err != nil { + if status.Code(err) == codes.Unknown { + err = status.Errorf(codes.Internal, err.Error()) + } + return err + } + + for _, pod := range pods { + // initiate a tap on the pod + go s.tapProxy(stream.Context(), rpsPerPod, match, pod.Status.PodIP, events) + } + + // read events from the taps and send them back + for event := range events { + err := stream.Send(event) + if err != nil { + if status.Code(err) == codes.Unknown { + err = status.Errorf(codes.Internal, err.Error()) + } + return err + } + } + return nil } func validatePort(port uint32) error { if port > 65535 { - return fmt.Errorf("Port number of range: %d", port) + return status.Errorf(codes.InvalidArgument, "port number of range: %d", port) } return nil } @@ -275,6 +364,103 @@ func parseMethod(method string) *common.HttpMethod { } } +func makeByResourceMatch(match *public.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) { + // TODO: for now assume it's always a single, flat `All` match list + seq := match.GetAll() + if seq == nil { + return nil, status.Errorf(codes.Unimplemented, "unexpected match specified: %+v", match) + } + + matches := []*proxy.ObserveRequest_Match{} + + for _, reqMatch := range seq.Matches { + switch typed := reqMatch.Match.(type) { + case *public.TapByResourceRequest_Match_Destinations: + + for k, v := range destinationLabels(typed.Destinations.Resource) { + matches = append(matches, &proxy.ObserveRequest_Match{ + Match: &proxy.ObserveRequest_Match_DestinationLabel{ + DestinationLabel: &proxy.ObserveRequest_Match_Label{ + Key: k, + Value: v, + }, + }, + }) + } + + case *public.TapByResourceRequest_Match_Http_: + + httpMatch := proxy.ObserveRequest_Match_Http{} + + switch httpTyped := typed.Http.Match.(type) { + case *public.TapByResourceRequest_Match_Http_Scheme: + httpMatch = proxy.ObserveRequest_Match_Http{ + Match: &proxy.ObserveRequest_Match_Http_Scheme{ + Scheme: parseScheme(httpTyped.Scheme), + }, + } + case *public.TapByResourceRequest_Match_Http_Method: + httpMatch = proxy.ObserveRequest_Match_Http{ + Match: &proxy.ObserveRequest_Match_Http_Method{ + Method: parseMethod(httpTyped.Method), + }, + } + case *public.TapByResourceRequest_Match_Http_Authority: + httpMatch = proxy.ObserveRequest_Match_Http{ + Match: &proxy.ObserveRequest_Match_Http_Authority{ + Authority: &proxy.ObserveRequest_Match_Http_StringMatch{ + Match: &proxy.ObserveRequest_Match_Http_StringMatch_Exact{ + Exact: httpTyped.Authority, + }, + }, + }, + } + case *public.TapByResourceRequest_Match_Http_Path: + httpMatch = proxy.ObserveRequest_Match_Http{ + Match: &proxy.ObserveRequest_Match_Http_Path{ + Path: &proxy.ObserveRequest_Match_Http_StringMatch{ + Match: &proxy.ObserveRequest_Match_Http_StringMatch_Prefix{ + Prefix: httpTyped.Path, + }, + }, + }, + } + default: + return nil, status.Errorf(codes.Unimplemented, "unknown HTTP match type: %v", httpTyped) + } + + matches = append(matches, &proxy.ObserveRequest_Match{ + Match: &proxy.ObserveRequest_Match_Http_{ + Http: &httpMatch, + }, + }) + + default: + return nil, status.Errorf(codes.Unimplemented, "unknown match type: %v", typed) + } + } + + return &proxy.ObserveRequest_Match{ + Match: &proxy.ObserveRequest_Match_All{ + All: &proxy.ObserveRequest_Match_Seq{ + Matches: matches, + }, + }, + }, nil +} + +// TODO: factor out with `promLabels` in public-api +func destinationLabels(resource *public.Resource) map[string]string { + dstLabels := map[string]string{} + if resource.Name != "" { + dstLabels[k8sResourceTypesToDestinationLabels[resource.Type]] = resource.Name + } + if resource.Type != pkgK8s.KubernetesNamespaces && resource.Namespace != "" { + dstLabels["namespace"] = resource.Namespace + } + return dstLabels +} + // Tap a pod. // This method will run continuously until an error is encountered or the // request is cancelled via the context. Thus it should be called as a @@ -285,10 +471,10 @@ func parseMethod(method string) *common.HttpMethod { // again. func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, addr string, events chan *common.TapEvent) { tapAddr := fmt.Sprintf("%s:%d", addr, s.tapPort) - log.Printf("Establishing tap on %s", tapAddr) + log.Infof("Establishing tap on %s", tapAddr) conn, err := grpc.DialContext(ctx, tapAddr, grpc.WithInsecure()) if err != nil { - log.Println(err) + log.Error(err) return } client := proxy.NewTapClient(conn) @@ -303,7 +489,7 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse windowEnd := windowStart.Add(tapInterval) rsp, err := client.Observe(ctx, req) if err != nil { - log.Println(err) + log.Error(err) return } for { // Stream loop @@ -312,7 +498,7 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse break } if err != nil { - log.Println(err) + log.Error(err) return } events <- event @@ -323,45 +509,230 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse } } -func NewServer(addr string, tapPort uint, kubeconfig string) (*grpc.Server, net.Listener, error) { +// +// TODO: factor all these functions out of public-api into a shared k8s lister/resource module +// - clientSet, err := k8s.NewClientSet(kubeconfig) - if err != nil { - return nil, nil, err +func (s *server) getPodsFor(res public.ResourceSelection) ([]*apiv1.Pod, error) { + var err error + namespace := res.Resource.Namespace + objects := []runtime.Object{} + + switch res.Resource.Type { + case pkgK8s.KubernetesDeployments: + objects, err = s.getDeployments(res.Resource) + case pkgK8s.KubernetesNamespaces: + namespace = res.Resource.Name // special case for namespace + objects, err = s.getNamespaces(res.Resource) + case pkgK8s.KubernetesReplicationControllers: + objects, err = s.getReplicationControllers(res.Resource) + case pkgK8s.KubernetesServices: + objects, err = s.getServices(res.Resource) + + // special case for pods + case pkgK8s.KubernetesPods: + return s.getPods(res.Resource) + + default: + err = status.Errorf(codes.Unimplemented, "unimplemented resource type: %v", res.Resource.Type) } - replicaSets, err := k8s.NewReplicaSetStore(clientSet) if err != nil { - return nil, nil, err - } - err = replicaSets.Run() - if err != nil { - return nil, nil, err + return nil, err } - // index pods by deployment - deploymentIndex := func(obj interface{}) ([]string, error) { - pod, ok := obj.(*v1.Pod) - if !ok { - return nil, fmt.Errorf("object is not a Pod") - } - deployment, err := replicaSets.GetDeploymentForPod(pod) + allPods := []*apiv1.Pod{} + for _, obj := range objects { + selector, err := getSelectorFromObject(obj) if err != nil { - log.Debugf("Cannot get deployment for pod %s: %s", pod.Name, err) - return []string{}, nil + return nil, err + } + + // TODO: special case namespace + pods, err := s.podLister.Pods(namespace).List(selector) + if err != nil { + return nil, err + } + + for _, pod := range pods { + if isPendingOrRunning(pod) { + allPods = append(allPods, pod) + } } - return []string{deployment}, nil } - pods, err := k8s.NewPodIndex(clientSet, deploymentIndex) - if err != nil { - return nil, nil, err + return allPods, nil +} + +func isPendingOrRunning(pod *apiv1.Pod) bool { + pending := pod.Status.Phase == apiv1.PodPending + running := pod.Status.Phase == apiv1.PodRunning + terminating := pod.DeletionTimestamp != nil + return (pending || running) && !terminating +} + +func getSelectorFromObject(obj runtime.Object) (labels.Selector, error) { + switch typed := obj.(type) { + case *apiv1.Namespace: + return labels.Everything(), nil + + case *appsv1beta2.Deployment: + return labels.Set(typed.Spec.Selector.MatchLabels).AsSelector(), nil + + case *apiv1.ReplicationController: + return labels.Set(typed.Spec.Selector).AsSelector(), nil + + case *apiv1.Service: + return labels.Set(typed.Spec.Selector).AsSelector(), nil + + default: + return nil, status.Errorf(codes.Unimplemented, "cannot get object selector: %v", obj) } - err = pods.Run() - if err != nil { - return nil, nil, err +} + +func (s *server) getDeployments(res *public.Resource) ([]runtime.Object, error) { + var err error + var deployments []*appsv1beta2.Deployment + + if res.Namespace == "" { + deployments, err = s.deployLister.List(labels.Everything()) + } else if res.Name == "" { + deployments, err = s.deployLister.Deployments(res.Namespace).List(labels.Everything()) + } else { + var deployment *appsv1beta2.Deployment + deployment, err = s.deployLister.Deployments(res.Namespace).Get(res.Name) + deployments = []*appsv1beta2.Deployment{deployment} } + if err != nil { + return nil, err + } + + objects := []runtime.Object{} + for _, deploy := range deployments { + objects = append(objects, deploy) + } + + return objects, nil +} + +func (s *server) getNamespaces(res *public.Resource) ([]runtime.Object, error) { + var err error + var namespaces []*apiv1.Namespace + + if res.Name == "" { + namespaces, err = s.namespaceLister.List(labels.Everything()) + } else { + var namespace *apiv1.Namespace + namespace, err = s.namespaceLister.Get(res.Name) + namespaces = []*apiv1.Namespace{namespace} + } + + if err != nil { + return nil, err + } + + objects := []runtime.Object{} + for _, ns := range namespaces { + objects = append(objects, ns) + } + + return objects, nil +} + +func (s *server) getPods(res *public.Resource) ([]*apiv1.Pod, error) { + var err error + var pods []*apiv1.Pod + + if res.Namespace == "" { + pods, err = s.podLister.List(labels.Everything()) + } else if res.Name == "" { + pods, err = s.podLister.Pods(res.Namespace).List(labels.Everything()) + } else { + var pod *apiv1.Pod + pod, err = s.podLister.Pods(res.Namespace).Get(res.Name) + pods = []*apiv1.Pod{pod} + } + + if err != nil { + return nil, err + } + + var runningPods []*apiv1.Pod + for _, pod := range pods { + if isPendingOrRunning(pod) { + runningPods = append(runningPods, pod) + } + } + + return runningPods, nil +} + +func (s *server) getReplicationControllers(res *public.Resource) ([]runtime.Object, error) { + var err error + var rcs []*apiv1.ReplicationController + + if res.Namespace == "" { + rcs, err = s.replicationControllerLister.List(labels.Everything()) + } else if res.Name == "" { + rcs, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).List(labels.Everything()) + } else { + var rc *apiv1.ReplicationController + rc, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).Get(res.Name) + rcs = []*apiv1.ReplicationController{rc} + } + + if err != nil { + return nil, err + } + + objects := []runtime.Object{} + for _, rc := range rcs { + objects = append(objects, rc) + } + + return objects, nil +} + +func (s *server) getServices(res *public.Resource) ([]runtime.Object, error) { + var err error + var services []*apiv1.Service + + if res.Namespace == "" { + services, err = s.serviceLister.List(labels.Everything()) + } else if res.Name == "" { + services, err = s.serviceLister.Services(res.Namespace).List(labels.Everything()) + } else { + var svc *apiv1.Service + svc, err = s.serviceLister.Services(res.Namespace).Get(res.Name) + services = []*apiv1.Service{svc} + } + + if err != nil { + return nil, err + } + + objects := []runtime.Object{} + for _, svc := range services { + objects = append(objects, svc) + } + + return objects, nil +} + +func NewServer( + addr string, + tapPort uint, + replicaSets *k8s.ReplicaSetStore, + pods k8s.PodIndex, + namespaceLister corelisters.NamespaceLister, + deployLister applisters.DeploymentLister, + replicaSetLister applisters.ReplicaSetLister, + podLister corelisters.PodLister, + replicationControllerLister corelisters.ReplicationControllerLister, + serviceLister corelisters.ServiceLister, +) (*grpc.Server, net.Listener, error) { + lis, err := net.Listen("tcp", addr) if err != nil { return nil, nil, err @@ -369,9 +740,15 @@ func NewServer(addr string, tapPort uint, kubeconfig string) (*grpc.Server, net. s := util.NewGrpcServer() srv := server{ - tapPort: tapPort, - replicaSets: replicaSets, - pods: pods, + tapPort: tapPort, + replicaSets: replicaSets, + pods: pods, + namespaceLister: namespaceLister, + deployLister: deployLister, + replicaSetLister: replicaSetLister, + podLister: podLister, + replicationControllerLister: replicationControllerLister, + serviceLister: serviceLister, } pb.RegisterTapServer(s, &srv) diff --git a/controller/tap/server_test.go b/controller/tap/server_test.go new file mode 100644 index 000000000..89a586d97 --- /dev/null +++ b/controller/tap/server_test.go @@ -0,0 +1,243 @@ +package tap + +import ( + "context" + "testing" + "time" + + public "github.com/runconduit/conduit/controller/gen/public" + "github.com/runconduit/conduit/controller/k8s" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes/fake" + "k8s.io/client-go/kubernetes/scheme" + "k8s.io/client-go/tools/cache" +) + +type tapExpected struct { + msg string + k8sRes []string + req public.TapByResourceRequest + eofOk bool +} + +func TestTapByResource(t *testing.T) { + t.Run("Returns expected response", func(t *testing.T) { + expectations := []tapExpected{ + tapExpected{ + msg: "rpc error: code = InvalidArgument desc = TapByResource received nil target ResourceSelection: {Target: Match: MaxRps:0}", + k8sRes: []string{}, + req: public.TapByResourceRequest{}, + }, + tapExpected{ + msg: "rpc error: code = Unimplemented desc = unexpected match specified: any:<> ", + k8sRes: []string{` +apiVersion: v1 +kind: Pod +metadata: + name: emojivoto-meshed + namespace: emojivoto + labels: + app: emoji-svc + annotations: + conduit.io/proxy-version: testinjectversion +status: + phase: Running +`, + }, + req: public.TapByResourceRequest{ + Target: &public.ResourceSelection{ + Resource: &public.Resource{ + Namespace: "emojivoto", + Type: "pods", + Name: "emojivoto-meshed", + }, + }, + Match: &public.TapByResourceRequest_Match{ + Match: &public.TapByResourceRequest_Match_Any{ + Any: &public.TapByResourceRequest_Match_Seq{}, + }, + }, + }, + }, + tapExpected{ + msg: "rpc error: code = Unimplemented desc = unimplemented resource type: bad-type", + k8sRes: []string{}, + req: public.TapByResourceRequest{ + Target: &public.ResourceSelection{ + Resource: &public.Resource{ + Namespace: "emojivoto", + Type: "bad-type", + Name: "emojivoto-meshed-not-found", + }, + }, + }, + }, + tapExpected{ + msg: "rpc error: code = NotFound desc = pod \"emojivoto-meshed-not-found\" not found", + k8sRes: []string{` +apiVersion: v1 +kind: Pod +metadata: + name: emojivoto-meshed + namespace: emojivoto + labels: + app: emoji-svc + annotations: + conduit.io/proxy-version: testinjectversion +status: + phase: Running +`, + }, + req: public.TapByResourceRequest{ + Target: &public.ResourceSelection{ + Resource: &public.Resource{ + Namespace: "emojivoto", + Type: "pods", + Name: "emojivoto-meshed-not-found", + }, + }, + }, + }, + tapExpected{ + msg: "rpc error: code = NotFound desc = no pods found for ResourceSelection: {Resource:namespace:\"emojivoto\" type:\"pods\" name:\"emojivoto-meshed\" LabelSelector:}", + k8sRes: []string{` +apiVersion: v1 +kind: Pod +metadata: + name: emojivoto-meshed + namespace: emojivoto + labels: + app: emoji-svc + annotations: + conduit.io/proxy-version: testinjectversion +status: + phase: Finished +`, + }, + req: public.TapByResourceRequest{ + Target: &public.ResourceSelection{ + Resource: &public.Resource{ + Namespace: "emojivoto", + Type: "pods", + Name: "emojivoto-meshed", + }, + }, + }, + }, + tapExpected{ + // indicates we will accept EOF, in addition to the deadline exceeded message + eofOk: true, + // success, underlying tap events tested in http_server_test.go + msg: "rpc error: code = DeadlineExceeded desc = context deadline exceeded", + k8sRes: []string{` +apiVersion: v1 +kind: Pod +metadata: + name: emojivoto-meshed + namespace: emojivoto + labels: + app: emoji-svc + annotations: + conduit.io/proxy-version: testinjectversion +status: + phase: Running +`, + }, + req: public.TapByResourceRequest{ + Target: &public.ResourceSelection{ + Resource: &public.Resource{ + Namespace: "emojivoto", + Type: "pods", + Name: "emojivoto-meshed", + }, + }, + Match: &public.TapByResourceRequest_Match{ + Match: &public.TapByResourceRequest_Match_All{ + All: &public.TapByResourceRequest_Match_Seq{}, + }, + }, + }, + }, + } + + for _, exp := range expectations { + k8sObjs := []runtime.Object{} + for _, res := range exp.k8sRes { + decode := scheme.Codecs.UniversalDeserializer().Decode + obj, _, err := decode([]byte(res), nil, nil) + if err != nil { + t.Fatalf("could not decode yml: %s", err) + } + k8sObjs = append(k8sObjs, obj) + } + + clientSet := fake.NewSimpleClientset(k8sObjs...) + + replicaSets, err := k8s.NewReplicaSetStore(clientSet) + if err != nil { + t.Fatalf("NewReplicaSetStore failed: %s", err) + } + + sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute) + + namespaceInformer := sharedInformers.Core().V1().Namespaces() + deployInformer := sharedInformers.Apps().V1beta2().Deployments() + replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets() + podInformer := sharedInformers.Core().V1().Pods() + replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers() + serviceInformer := sharedInformers.Core().V1().Services() + + server, listener, err := NewServer( + "localhost:0", 0, replicaSets, k8s.NewEmptyPodIndex(), + namespaceInformer.Lister(), + deployInformer.Lister(), + replicaSetInformer.Lister(), + podInformer.Lister(), + replicationControllerInformer.Lister(), + serviceInformer.Lister(), + ) + if err != nil { + t.Fatalf("NewServer error: %s", err) + } + + go func() { server.Serve(listener) }() + defer server.GracefulStop() + + stopCh := make(chan struct{}) + sharedInformers.Start(stopCh) + if !cache.WaitForCacheSync( + stopCh, + namespaceInformer.Informer().HasSynced, + deployInformer.Informer().HasSynced, + replicaSetInformer.Informer().HasSynced, + podInformer.Informer().HasSynced, + replicationControllerInformer.Informer().HasSynced, + serviceInformer.Informer().HasSynced, + ) { + t.Fatalf("timed out wait for caches to sync") + } + + client, conn, err := NewClient(listener.Addr().String()) + if err != nil { + t.Fatalf("NewClient error: %v", err) + } + defer conn.Close() + + // TODO: mock out the underlying grpc tap events, rather than waiting an + // arbitrary time for request to timeout. + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + tapClient, err := client.TapByResource(ctx, &exp.req) + if err != nil { + t.Fatalf("TapByResource failed: %v", err) + } + + _, err = tapClient.Recv() + if err.Error() != exp.msg && (!exp.eofOk || err.Error() != "EOF") { + t.Fatalf("Expected error to be [%s], but was [%s]. eofOk: %v", exp.msg, err, exp.eofOk) + } + } + }) +} diff --git a/proxy-init/Dockerfile b/proxy-init/Dockerfile index 2c0fddf4e..a26c20dbb 100644 --- a/proxy-init/Dockerfile +++ b/proxy-init/Dockerfile @@ -1,5 +1,5 @@ ## compile proxy-init utility -FROM gcr.io/runconduit/go-deps:d17b1119 as golang +FROM gcr.io/runconduit/go-deps:a6e8221d as golang WORKDIR /go/src/github.com/runconduit/conduit COPY ./proxy-init ./proxy-init RUN CGO_ENABLED=0 GOOS=linux go install -v -installsuffix cgo ./proxy-init/ diff --git a/web/Dockerfile b/web/Dockerfile index 467db3409..ed327855e 100644 --- a/web/Dockerfile +++ b/web/Dockerfile @@ -12,7 +12,7 @@ RUN $HOME/.yarn/bin/yarn install --pure-lockfile RUN $HOME/.yarn/bin/yarn webpack ## compile go server -FROM gcr.io/runconduit/go-deps:d17b1119 as golang +FROM gcr.io/runconduit/go-deps:a6e8221d as golang ARG CONDUIT_VERSION WORKDIR /go/src/github.com/runconduit/conduit COPY web web