Implement TapByResource in Tap Service (#827)

The TapByResource endpoint was previously a stub.

Implement end-to-end tapByResource functionality, with support for
specifying any kubernetes resource(s) as target and destination.

Fixes #803, #49

Signed-off-by: Andrew Seigner <siggy@buoyant.io>
This commit is contained in:
Andrew Seigner 2018-04-23 16:13:26 -07:00 committed by GitHub
parent d9112abc93
commit baf4ea1a5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 814 additions and 57 deletions

2
Gopkg.lock generated
View File

@ -746,6 +746,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "f5ff9662192d238efdb260226a886088d05da0924292833660466258fd80c8c0"
inputs-digest = "325d0175a74db86d1fed482bddc8279459d63be9d5338d9a420722c24eea5534"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -1,5 +1,5 @@
## compile binaries
FROM gcr.io/runconduit/go-deps:d17b1119 as golang
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
ARG CONDUIT_VERSION
WORKDIR /go/src/github.com/runconduit/conduit
COPY cli cli

View File

@ -1,5 +1,5 @@
## compile controller services
FROM gcr.io/runconduit/go-deps:d17b1119 as golang
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
ARG CONDUIT_VERSION
WORKDIR /go/src/github.com/runconduit/conduit
COPY controller/gen controller/gen

View File

@ -77,7 +77,24 @@ func (c *grpcOverHttpClient) Tap(ctx context.Context, req *pb.TapRequest, _ ...g
}
func (c *grpcOverHttpClient) TapByResource(ctx context.Context, req *pb.TapByResourceRequest, _ ...grpc.CallOption) (pb.Api_TapByResourceClient, error) {
return nil, fmt.Errorf("Unimplemented")
url := c.endpointNameToPublicApiUrl("TapByResource")
httpRsp, err := c.post(ctx, url, req)
if err != nil {
return nil, err
}
if err = checkIfResponseHasConduitError(httpRsp); err != nil {
httpRsp.Body.Close()
return nil, err
}
go func() {
<-ctx.Done()
log.Debug("Closing response body after context marked as done")
httpRsp.Body.Close()
}()
return &tapClient{ctx: ctx, reader: bufio.NewReader(httpRsp.Body)}, nil
}
func (c *grpcOverHttpClient) apiRequest(ctx context.Context, endpoint string, req proto.Message, protoResponse proto.Message) error {

View File

@ -194,6 +194,7 @@ spec:
replicaSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
replicationControllerInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
) {
t.Fatalf("timed out wait for caches to sync")
}

View File

@ -19,11 +19,12 @@ import (
)
var (
statSummaryPath = fullUrlPathFor("StatSummary")
versionPath = fullUrlPathFor("Version")
listPodsPath = fullUrlPathFor("ListPods")
tapPath = fullUrlPathFor("Tap")
selfCheckPath = fullUrlPathFor("SelfCheck")
statSummaryPath = fullUrlPathFor("StatSummary")
versionPath = fullUrlPathFor("Version")
listPodsPath = fullUrlPathFor("ListPods")
tapPath = fullUrlPathFor("Tap")
tapByResourcePath = fullUrlPathFor("TapByResource")
selfCheckPath = fullUrlPathFor("SelfCheck")
)
type handler struct {
@ -50,6 +51,8 @@ func (h *handler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.handleListPods(w, req)
case tapPath:
h.handleTap(w, req)
case tapByResourcePath:
h.handleTapByResource(w, req)
case selfCheckPath:
h.handleSelfCheck(w, req)
default:
@ -164,6 +167,28 @@ func (h *handler) handleTap(w http.ResponseWriter, req *http.Request) {
}
}
func (h *handler) handleTapByResource(w http.ResponseWriter, req *http.Request) {
flushableWriter, err := newStreamingWriter(w)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
var protoRequest pb.TapByResourceRequest
err = httpRequestToProto(req, &protoRequest)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
server := tapServer{w: flushableWriter, req: req}
err = h.grpcServer.TapByResource(&protoRequest, server)
if err != nil {
writeErrorToHttpResponse(w, err)
return
}
}
type tapServer struct {
w flushableResponseWriter
req *http.Request

View File

@ -181,7 +181,7 @@ func TestServer(t *testing.T) {
mockGrpcServer.TapStreamsToReturn = expectedTapResponses
mockGrpcServer.ErrorToReturn = nil
tapClient, err := client.Tap(context.TODO(), &pb.TapRequest{})
tapClient, err := client.TapByResource(context.TODO(), &pb.TapByResourceRequest{})
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}

View File

@ -1,15 +1,22 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/tap"
"github.com/runconduit/conduit/controller/util"
"github.com/runconduit/conduit/pkg/version"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func main() {
@ -33,11 +40,98 @@ func main() {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
server, lis, err := tap.NewServer(*addr, *tapPort, *kubeConfigPath)
clientSet, err := k8s.NewClientSet(*kubeConfigPath)
if err != nil {
log.Fatalf("failed to create Kubernetes client: %s", err)
}
replicaSets, err := k8s.NewReplicaSetStore(clientSet)
if err != nil {
log.Fatalf("NewReplicaSetStore failed: %s", err)
}
err = replicaSets.Run()
if err != nil {
log.Fatalf("replicaSets.Run() failed: %s", err)
}
// index pods by deployment
deploymentIndex := func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a Pod")
}
deployment, err := replicaSets.GetDeploymentForPod(pod)
if err != nil {
log.Debugf("Cannot get deployment for pod %s: %s", pod.Name, err)
return []string{}, nil
}
return []string{deployment}, nil
}
pods, err := k8s.NewPodIndex(clientSet, deploymentIndex)
if err != nil {
log.Fatalf("NewPodIndex failed: %s", err)
}
err = pods.Run()
if err != nil {
log.Fatalf("pods.Run() failed: %s", err)
}
// TODO: factor out with public-api
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
namespaceInformerSynced := namespaceInformer.Informer().HasSynced
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
deployInformerSynced := deployInformer.Informer().HasSynced
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
replicaSetInformerSynced := replicaSetInformer.Informer().HasSynced
podInformer := sharedInformers.Core().V1().Pods()
podInformerSynced := podInformer.Informer().HasSynced
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
replicationControllerInformerSynced := replicationControllerInformer.Informer().HasSynced
serviceInformer := sharedInformers.Core().V1().Services()
serviceInformerSynced := serviceInformer.Informer().HasSynced
sharedInformers.Start(nil)
server, lis, err := tap.NewServer(
*addr, *tapPort, replicaSets, pods,
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
)
if err != nil {
log.Fatal(err.Error())
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
log.Infof("waiting for caches to sync")
if !cache.WaitForCacheSync(
ctx.Done(),
namespaceInformerSynced,
deployInformerSynced,
replicaSetInformerSynced,
podInformerSynced,
replicationControllerInformerSynced,
serviceInformerSynced,
) {
log.Fatalf("timed out wait for caches to sync")
}
log.Infof("caches synced")
}()
go func() {
log.Println("starting gRPC server on", *addr)
server.Serve(lis)

View File

@ -26,7 +26,7 @@ type podIndex struct {
stopCh chan struct{}
}
func NewPodIndex(clientset *kubernetes.Clientset, index cache.IndexFunc) (PodIndex, error) {
func NewPodIndex(clientset kubernetes.Interface, index cache.IndexFunc) (PodIndex, error) {
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{"index": index})
podListWatcher := cache.NewListWatchFromClient(

View File

@ -19,7 +19,7 @@ type ReplicaSetStore struct {
stopCh chan struct{}
}
func NewReplicaSetStore(clientset *kubernetes.Clientset) (*ReplicaSetStore, error) {
func NewReplicaSetStore(clientset kubernetes.Interface) (*ReplicaSetStore, error) {
store := cache.NewStore(cache.MetaNamespaceKeyFunc)
replicatSetListWatcher := cache.NewListWatchFromClient(

View File

@ -14,21 +14,49 @@ import (
public "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/util"
pkgK8s "github.com/runconduit/conduit/pkg/k8s"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"k8s.io/api/core/v1"
appsv1beta2 "k8s.io/api/apps/v1beta2"
apiv1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
applisters "k8s.io/client-go/listers/apps/v1beta2"
corelisters "k8s.io/client-go/listers/core/v1"
)
var tapInterval = 10 * time.Second
type (
server struct {
tapPort uint
// We use the Kubernetes API to find the IP addresses of pods to tap
// TODO: remove these when TapByResource replaces tap
replicaSets *k8s.ReplicaSetStore
pods k8s.PodIndex
// TODO: factor out with public-api
namespaceLister corelisters.NamespaceLister
deployLister applisters.DeploymentLister
replicaSetLister applisters.ReplicaSetLister
podLister corelisters.PodLister
replicationControllerLister corelisters.ReplicationControllerLister
serviceLister corelisters.ServiceLister
}
)
var (
tapInterval = 10 * time.Second
// TODO: factor out with public-api
k8sResourceTypesToDestinationLabels = map[string]string{
pkgK8s.KubernetesDeployments: "deployment",
pkgK8s.KubernetesNamespaces: "namespace",
pkgK8s.KubernetesPods: "pod",
pkgK8s.KubernetesReplicationControllers: "replication_controller",
pkgK8s.KubernetesServices: "service",
}
)
@ -36,7 +64,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
// TODO: Allow a configurable aperture A.
// If the target contains more than A pods, select A of them at random.
var pods []*v1.Pod
var pods []*apiv1.Pod
var targetName string
switch target := req.Target.(type) {
case *public.TapRequest_Pod:
@ -45,7 +73,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
if err != nil {
return status.Errorf(codes.NotFound, err.Error())
}
pods = []*v1.Pod{pod}
pods = []*apiv1.Pod{pod}
case *public.TapRequest_Deployment:
targetName = target.Deployment
var err error
@ -55,7 +83,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
}
}
log.Printf("Tapping %d pods for target %s", len(pods), targetName)
log.Infof("Tapping %d pods for target %s", len(pods), targetName)
events := make(chan *common.TapEvent)
@ -74,7 +102,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
// initiate a tap on the pod
match, err := makeMatch(req)
if err != nil {
return nil
return err
}
go s.tapProxy(stream.Context(), rpsPerPod, match, pod.Status.PodIP, events)
}
@ -90,12 +118,73 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
}
func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_TapByResourceServer) error {
return fmt.Errorf("unimplemented")
if req == nil {
return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest")
}
if req.Target == nil {
return status.Errorf(codes.InvalidArgument, "TapByResource received nil target ResourceSelection: %+v", *req)
}
pods, err := s.getPodsFor(*req.Target)
if err != nil {
if status.Code(err) == codes.Unknown {
if k8sErrors.ReasonForError(err) == metaV1.StatusReasonNotFound {
err = status.Errorf(codes.NotFound, err.Error())
} else {
err = status.Errorf(codes.Internal, err.Error())
}
}
return err
}
if len(pods) == 0 {
return status.Errorf(codes.NotFound, "no pods found for ResourceSelection: %+v", *req.Target)
}
log.Infof("Tapping %d pods for target: %+v", len(pods), *req.Target.Resource)
events := make(chan *common.TapEvent)
go func() { // Stop sending back events if the request is cancelled
<-stream.Context().Done()
close(events)
}()
// divide the rps evenly between all pods to tap
rpsPerPod := req.MaxRps / float32(len(pods))
if rpsPerPod < 1 {
rpsPerPod = 1
}
match, err := makeByResourceMatch(req.Match)
if err != nil {
if status.Code(err) == codes.Unknown {
err = status.Errorf(codes.Internal, err.Error())
}
return err
}
for _, pod := range pods {
// initiate a tap on the pod
go s.tapProxy(stream.Context(), rpsPerPod, match, pod.Status.PodIP, events)
}
// read events from the taps and send them back
for event := range events {
err := stream.Send(event)
if err != nil {
if status.Code(err) == codes.Unknown {
err = status.Errorf(codes.Internal, err.Error())
}
return err
}
}
return nil
}
func validatePort(port uint32) error {
if port > 65535 {
return fmt.Errorf("Port number of range: %d", port)
return status.Errorf(codes.InvalidArgument, "port number of range: %d", port)
}
return nil
}
@ -275,6 +364,103 @@ func parseMethod(method string) *common.HttpMethod {
}
}
func makeByResourceMatch(match *public.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) {
// TODO: for now assume it's always a single, flat `All` match list
seq := match.GetAll()
if seq == nil {
return nil, status.Errorf(codes.Unimplemented, "unexpected match specified: %+v", match)
}
matches := []*proxy.ObserveRequest_Match{}
for _, reqMatch := range seq.Matches {
switch typed := reqMatch.Match.(type) {
case *public.TapByResourceRequest_Match_Destinations:
for k, v := range destinationLabels(typed.Destinations.Resource) {
matches = append(matches, &proxy.ObserveRequest_Match{
Match: &proxy.ObserveRequest_Match_DestinationLabel{
DestinationLabel: &proxy.ObserveRequest_Match_Label{
Key: k,
Value: v,
},
},
})
}
case *public.TapByResourceRequest_Match_Http_:
httpMatch := proxy.ObserveRequest_Match_Http{}
switch httpTyped := typed.Http.Match.(type) {
case *public.TapByResourceRequest_Match_Http_Scheme:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Scheme{
Scheme: parseScheme(httpTyped.Scheme),
},
}
case *public.TapByResourceRequest_Match_Http_Method:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Method{
Method: parseMethod(httpTyped.Method),
},
}
case *public.TapByResourceRequest_Match_Http_Authority:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Authority{
Authority: &proxy.ObserveRequest_Match_Http_StringMatch{
Match: &proxy.ObserveRequest_Match_Http_StringMatch_Exact{
Exact: httpTyped.Authority,
},
},
},
}
case *public.TapByResourceRequest_Match_Http_Path:
httpMatch = proxy.ObserveRequest_Match_Http{
Match: &proxy.ObserveRequest_Match_Http_Path{
Path: &proxy.ObserveRequest_Match_Http_StringMatch{
Match: &proxy.ObserveRequest_Match_Http_StringMatch_Prefix{
Prefix: httpTyped.Path,
},
},
},
}
default:
return nil, status.Errorf(codes.Unimplemented, "unknown HTTP match type: %v", httpTyped)
}
matches = append(matches, &proxy.ObserveRequest_Match{
Match: &proxy.ObserveRequest_Match_Http_{
Http: &httpMatch,
},
})
default:
return nil, status.Errorf(codes.Unimplemented, "unknown match type: %v", typed)
}
}
return &proxy.ObserveRequest_Match{
Match: &proxy.ObserveRequest_Match_All{
All: &proxy.ObserveRequest_Match_Seq{
Matches: matches,
},
},
}, nil
}
// TODO: factor out with `promLabels` in public-api
func destinationLabels(resource *public.Resource) map[string]string {
dstLabels := map[string]string{}
if resource.Name != "" {
dstLabels[k8sResourceTypesToDestinationLabels[resource.Type]] = resource.Name
}
if resource.Type != pkgK8s.KubernetesNamespaces && resource.Namespace != "" {
dstLabels["namespace"] = resource.Namespace
}
return dstLabels
}
// Tap a pod.
// This method will run continuously until an error is encountered or the
// request is cancelled via the context. Thus it should be called as a
@ -285,10 +471,10 @@ func parseMethod(method string) *common.HttpMethod {
// again.
func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, addr string, events chan *common.TapEvent) {
tapAddr := fmt.Sprintf("%s:%d", addr, s.tapPort)
log.Printf("Establishing tap on %s", tapAddr)
log.Infof("Establishing tap on %s", tapAddr)
conn, err := grpc.DialContext(ctx, tapAddr, grpc.WithInsecure())
if err != nil {
log.Println(err)
log.Error(err)
return
}
client := proxy.NewTapClient(conn)
@ -303,7 +489,7 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse
windowEnd := windowStart.Add(tapInterval)
rsp, err := client.Observe(ctx, req)
if err != nil {
log.Println(err)
log.Error(err)
return
}
for { // Stream loop
@ -312,7 +498,7 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse
break
}
if err != nil {
log.Println(err)
log.Error(err)
return
}
events <- event
@ -323,45 +509,230 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse
}
}
func NewServer(addr string, tapPort uint, kubeconfig string) (*grpc.Server, net.Listener, error) {
//
// TODO: factor all these functions out of public-api into a shared k8s lister/resource module
//
clientSet, err := k8s.NewClientSet(kubeconfig)
if err != nil {
return nil, nil, err
func (s *server) getPodsFor(res public.ResourceSelection) ([]*apiv1.Pod, error) {
var err error
namespace := res.Resource.Namespace
objects := []runtime.Object{}
switch res.Resource.Type {
case pkgK8s.KubernetesDeployments:
objects, err = s.getDeployments(res.Resource)
case pkgK8s.KubernetesNamespaces:
namespace = res.Resource.Name // special case for namespace
objects, err = s.getNamespaces(res.Resource)
case pkgK8s.KubernetesReplicationControllers:
objects, err = s.getReplicationControllers(res.Resource)
case pkgK8s.KubernetesServices:
objects, err = s.getServices(res.Resource)
// special case for pods
case pkgK8s.KubernetesPods:
return s.getPods(res.Resource)
default:
err = status.Errorf(codes.Unimplemented, "unimplemented resource type: %v", res.Resource.Type)
}
replicaSets, err := k8s.NewReplicaSetStore(clientSet)
if err != nil {
return nil, nil, err
}
err = replicaSets.Run()
if err != nil {
return nil, nil, err
return nil, err
}
// index pods by deployment
deploymentIndex := func(obj interface{}) ([]string, error) {
pod, ok := obj.(*v1.Pod)
if !ok {
return nil, fmt.Errorf("object is not a Pod")
}
deployment, err := replicaSets.GetDeploymentForPod(pod)
allPods := []*apiv1.Pod{}
for _, obj := range objects {
selector, err := getSelectorFromObject(obj)
if err != nil {
log.Debugf("Cannot get deployment for pod %s: %s", pod.Name, err)
return []string{}, nil
return nil, err
}
// TODO: special case namespace
pods, err := s.podLister.Pods(namespace).List(selector)
if err != nil {
return nil, err
}
for _, pod := range pods {
if isPendingOrRunning(pod) {
allPods = append(allPods, pod)
}
}
return []string{deployment}, nil
}
pods, err := k8s.NewPodIndex(clientSet, deploymentIndex)
if err != nil {
return nil, nil, err
return allPods, nil
}
func isPendingOrRunning(pod *apiv1.Pod) bool {
pending := pod.Status.Phase == apiv1.PodPending
running := pod.Status.Phase == apiv1.PodRunning
terminating := pod.DeletionTimestamp != nil
return (pending || running) && !terminating
}
func getSelectorFromObject(obj runtime.Object) (labels.Selector, error) {
switch typed := obj.(type) {
case *apiv1.Namespace:
return labels.Everything(), nil
case *appsv1beta2.Deployment:
return labels.Set(typed.Spec.Selector.MatchLabels).AsSelector(), nil
case *apiv1.ReplicationController:
return labels.Set(typed.Spec.Selector).AsSelector(), nil
case *apiv1.Service:
return labels.Set(typed.Spec.Selector).AsSelector(), nil
default:
return nil, status.Errorf(codes.Unimplemented, "cannot get object selector: %v", obj)
}
err = pods.Run()
if err != nil {
return nil, nil, err
}
func (s *server) getDeployments(res *public.Resource) ([]runtime.Object, error) {
var err error
var deployments []*appsv1beta2.Deployment
if res.Namespace == "" {
deployments, err = s.deployLister.List(labels.Everything())
} else if res.Name == "" {
deployments, err = s.deployLister.Deployments(res.Namespace).List(labels.Everything())
} else {
var deployment *appsv1beta2.Deployment
deployment, err = s.deployLister.Deployments(res.Namespace).Get(res.Name)
deployments = []*appsv1beta2.Deployment{deployment}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, deploy := range deployments {
objects = append(objects, deploy)
}
return objects, nil
}
func (s *server) getNamespaces(res *public.Resource) ([]runtime.Object, error) {
var err error
var namespaces []*apiv1.Namespace
if res.Name == "" {
namespaces, err = s.namespaceLister.List(labels.Everything())
} else {
var namespace *apiv1.Namespace
namespace, err = s.namespaceLister.Get(res.Name)
namespaces = []*apiv1.Namespace{namespace}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, ns := range namespaces {
objects = append(objects, ns)
}
return objects, nil
}
func (s *server) getPods(res *public.Resource) ([]*apiv1.Pod, error) {
var err error
var pods []*apiv1.Pod
if res.Namespace == "" {
pods, err = s.podLister.List(labels.Everything())
} else if res.Name == "" {
pods, err = s.podLister.Pods(res.Namespace).List(labels.Everything())
} else {
var pod *apiv1.Pod
pod, err = s.podLister.Pods(res.Namespace).Get(res.Name)
pods = []*apiv1.Pod{pod}
}
if err != nil {
return nil, err
}
var runningPods []*apiv1.Pod
for _, pod := range pods {
if isPendingOrRunning(pod) {
runningPods = append(runningPods, pod)
}
}
return runningPods, nil
}
func (s *server) getReplicationControllers(res *public.Resource) ([]runtime.Object, error) {
var err error
var rcs []*apiv1.ReplicationController
if res.Namespace == "" {
rcs, err = s.replicationControllerLister.List(labels.Everything())
} else if res.Name == "" {
rcs, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).List(labels.Everything())
} else {
var rc *apiv1.ReplicationController
rc, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).Get(res.Name)
rcs = []*apiv1.ReplicationController{rc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, rc := range rcs {
objects = append(objects, rc)
}
return objects, nil
}
func (s *server) getServices(res *public.Resource) ([]runtime.Object, error) {
var err error
var services []*apiv1.Service
if res.Namespace == "" {
services, err = s.serviceLister.List(labels.Everything())
} else if res.Name == "" {
services, err = s.serviceLister.Services(res.Namespace).List(labels.Everything())
} else {
var svc *apiv1.Service
svc, err = s.serviceLister.Services(res.Namespace).Get(res.Name)
services = []*apiv1.Service{svc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, svc := range services {
objects = append(objects, svc)
}
return objects, nil
}
func NewServer(
addr string,
tapPort uint,
replicaSets *k8s.ReplicaSetStore,
pods k8s.PodIndex,
namespaceLister corelisters.NamespaceLister,
deployLister applisters.DeploymentLister,
replicaSetLister applisters.ReplicaSetLister,
podLister corelisters.PodLister,
replicationControllerLister corelisters.ReplicationControllerLister,
serviceLister corelisters.ServiceLister,
) (*grpc.Server, net.Listener, error) {
lis, err := net.Listen("tcp", addr)
if err != nil {
return nil, nil, err
@ -369,9 +740,15 @@ func NewServer(addr string, tapPort uint, kubeconfig string) (*grpc.Server, net.
s := util.NewGrpcServer()
srv := server{
tapPort: tapPort,
replicaSets: replicaSets,
pods: pods,
tapPort: tapPort,
replicaSets: replicaSets,
pods: pods,
namespaceLister: namespaceLister,
deployLister: deployLister,
replicaSetLister: replicaSetLister,
podLister: podLister,
replicationControllerLister: replicationControllerLister,
serviceLister: serviceLister,
}
pb.RegisterTapServer(s, &srv)

View File

@ -0,0 +1,243 @@
package tap
import (
"context"
"testing"
"time"
public "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/controller/k8s"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
)
type tapExpected struct {
msg string
k8sRes []string
req public.TapByResourceRequest
eofOk bool
}
func TestTapByResource(t *testing.T) {
t.Run("Returns expected response", func(t *testing.T) {
expectations := []tapExpected{
tapExpected{
msg: "rpc error: code = InvalidArgument desc = TapByResource received nil target ResourceSelection: {Target:<nil> Match:<nil> MaxRps:0}",
k8sRes: []string{},
req: public.TapByResourceRequest{},
},
tapExpected{
msg: "rpc error: code = Unimplemented desc = unexpected match specified: any:<> ",
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-meshed
namespace: emojivoto
labels:
app: emoji-svc
annotations:
conduit.io/proxy-version: testinjectversion
status:
phase: Running
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "pods",
Name: "emojivoto-meshed",
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_Any{
Any: &public.TapByResourceRequest_Match_Seq{},
},
},
},
},
tapExpected{
msg: "rpc error: code = Unimplemented desc = unimplemented resource type: bad-type",
k8sRes: []string{},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "bad-type",
Name: "emojivoto-meshed-not-found",
},
},
},
},
tapExpected{
msg: "rpc error: code = NotFound desc = pod \"emojivoto-meshed-not-found\" not found",
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-meshed
namespace: emojivoto
labels:
app: emoji-svc
annotations:
conduit.io/proxy-version: testinjectversion
status:
phase: Running
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "pods",
Name: "emojivoto-meshed-not-found",
},
},
},
},
tapExpected{
msg: "rpc error: code = NotFound desc = no pods found for ResourceSelection: {Resource:namespace:\"emojivoto\" type:\"pods\" name:\"emojivoto-meshed\" LabelSelector:}",
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-meshed
namespace: emojivoto
labels:
app: emoji-svc
annotations:
conduit.io/proxy-version: testinjectversion
status:
phase: Finished
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "pods",
Name: "emojivoto-meshed",
},
},
},
},
tapExpected{
// indicates we will accept EOF, in addition to the deadline exceeded message
eofOk: true,
// success, underlying tap events tested in http_server_test.go
msg: "rpc error: code = DeadlineExceeded desc = context deadline exceeded",
k8sRes: []string{`
apiVersion: v1
kind: Pod
metadata:
name: emojivoto-meshed
namespace: emojivoto
labels:
app: emoji-svc
annotations:
conduit.io/proxy-version: testinjectversion
status:
phase: Running
`,
},
req: public.TapByResourceRequest{
Target: &public.ResourceSelection{
Resource: &public.Resource{
Namespace: "emojivoto",
Type: "pods",
Name: "emojivoto-meshed",
},
},
Match: &public.TapByResourceRequest_Match{
Match: &public.TapByResourceRequest_Match_All{
All: &public.TapByResourceRequest_Match_Seq{},
},
},
},
},
}
for _, exp := range expectations {
k8sObjs := []runtime.Object{}
for _, res := range exp.k8sRes {
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode([]byte(res), nil, nil)
if err != nil {
t.Fatalf("could not decode yml: %s", err)
}
k8sObjs = append(k8sObjs, obj)
}
clientSet := fake.NewSimpleClientset(k8sObjs...)
replicaSets, err := k8s.NewReplicaSetStore(clientSet)
if err != nil {
t.Fatalf("NewReplicaSetStore failed: %s", err)
}
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
podInformer := sharedInformers.Core().V1().Pods()
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
serviceInformer := sharedInformers.Core().V1().Services()
server, listener, err := NewServer(
"localhost:0", 0, replicaSets, k8s.NewEmptyPodIndex(),
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
)
if err != nil {
t.Fatalf("NewServer error: %s", err)
}
go func() { server.Serve(listener) }()
defer server.GracefulStop()
stopCh := make(chan struct{})
sharedInformers.Start(stopCh)
if !cache.WaitForCacheSync(
stopCh,
namespaceInformer.Informer().HasSynced,
deployInformer.Informer().HasSynced,
replicaSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
replicationControllerInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
) {
t.Fatalf("timed out wait for caches to sync")
}
client, conn, err := NewClient(listener.Addr().String())
if err != nil {
t.Fatalf("NewClient error: %v", err)
}
defer conn.Close()
// TODO: mock out the underlying grpc tap events, rather than waiting an
// arbitrary time for request to timeout.
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()
tapClient, err := client.TapByResource(ctx, &exp.req)
if err != nil {
t.Fatalf("TapByResource failed: %v", err)
}
_, err = tapClient.Recv()
if err.Error() != exp.msg && (!exp.eofOk || err.Error() != "EOF") {
t.Fatalf("Expected error to be [%s], but was [%s]. eofOk: %v", exp.msg, err, exp.eofOk)
}
}
})
}

View File

@ -1,5 +1,5 @@
## compile proxy-init utility
FROM gcr.io/runconduit/go-deps:d17b1119 as golang
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
WORKDIR /go/src/github.com/runconduit/conduit
COPY ./proxy-init ./proxy-init
RUN CGO_ENABLED=0 GOOS=linux go install -v -installsuffix cgo ./proxy-init/

View File

@ -12,7 +12,7 @@ RUN $HOME/.yarn/bin/yarn install --pure-lockfile
RUN $HOME/.yarn/bin/yarn webpack
## compile go server
FROM gcr.io/runconduit/go-deps:d17b1119 as golang
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
ARG CONDUIT_VERSION
WORKDIR /go/src/github.com/runconduit/conduit
COPY web web