Implement Public API and Tap on top of Lister (#835)

public-api and and tap were both using their own implementations of
the Kubernetes Informer/Lister APIs.

This change factors out all Informer/Lister usage into the Lister
module. This also introduces a new `Lister.GetObjects` method.

Signed-off-by: Andrew Seigner <siggy@buoyant.io>
This commit is contained in:
Andrew Seigner 2018-04-24 18:10:48 -07:00 committed by GitHub
parent a2c60e8fcf
commit a0a9a42e23
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 557 additions and 846 deletions

2
Gopkg.lock generated
View File

@ -746,6 +746,6 @@
[solve-meta]
analyzer-name = "dep"
analyzer-version = 1
inputs-digest = "325d0175a74db86d1fed482bddc8279459d63be9d5338d9a420722c24eea5534"
inputs-digest = "182946acde1cd53f9b8e6e3137ea55d1d14d7467a175ce261ddf30d74de3168e"
solver-name = "gps-cdcl"
solver-version = 1

View File

@ -1,8 +1,9 @@
## compile binaries
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
FROM gcr.io/runconduit/go-deps:ff0dc047 as golang
ARG CONDUIT_VERSION
WORKDIR /go/src/github.com/runconduit/conduit
COPY cli cli
COPY controller/k8s controller/k8s
COPY controller/api controller/api
COPY controller/gen controller/gen
COPY controller/util controller/util

View File

@ -30,8 +30,8 @@ Only pod resources (aka pods, po) are supported.`,
friendlyName := args[0]
resourceType, err := k8s.CanonicalKubernetesNameFromFriendlyName(friendlyName)
if err != nil || resourceType != k8s.KubernetesPods {
return fmt.Errorf("invalid resource type %s, only %s are allowed as resource types", friendlyName, k8s.KubernetesPods)
if err != nil || resourceType != k8s.Pods {
return fmt.Errorf("invalid resource type %s, only %s are allowed as resource types", friendlyName, k8s.Pods)
}
client, err := newPublicAPIClient()
if err != nil {

View File

@ -92,12 +92,12 @@ func init() {
func requestTapFromApi(w io.Writer, client pb.ApiClient, targetName string, resourceType string, req *pb.TapRequest) error {
switch resourceType {
case k8s.KubernetesDeployments:
case k8s.Deployments:
req.Target = &pb.TapRequest_Deployment{
Deployment: targetName,
}
case k8s.KubernetesPods:
case k8s.Pods:
req.Target = &pb.TapRequest_Pod{
Pod: targetName,
}

View File

@ -14,7 +14,7 @@ import (
func TestRequestTapByResourceFromAPI(t *testing.T) {
t.Run("Should render busy response if everything went well", func(t *testing.T) {
resourceType := k8s.KubernetesPods
resourceType := k8s.Pods
targetName := "pod-666"
scheme := "https"
method := "GET"
@ -83,7 +83,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
})
t.Run("Should render empty response if no events returned", func(t *testing.T) {
resourceType := k8s.KubernetesPods
resourceType := k8s.Pods
targetName := "pod-666"
scheme := "https"
method := "GET"
@ -123,7 +123,7 @@ func TestRequestTapByResourceFromAPI(t *testing.T) {
t.Run("Should return error if stream returned error", func(t *testing.T) {
t.SkipNow()
resourceType := k8s.KubernetesPods
resourceType := k8s.Pods
targetName := "pod-666"
scheme := "https"
method := "GET"

View File

@ -21,7 +21,7 @@ func TestRequestTapFromApi(t *testing.T) {
t.Run("Should render busy response if everything went well", func(t *testing.T) {
authority := "localhost"
targetName := "pod-666"
resourceType := k8s.KubernetesPods
resourceType := k8s.Pods
scheme := "https"
method := "GET"
path := "/some/path"
@ -95,7 +95,7 @@ func TestRequestTapFromApi(t *testing.T) {
t.Run("Should render empty response if no events returned", func(t *testing.T) {
authority := "localhost"
targetName := "pod-666"
resourceType := k8s.KubernetesPods
resourceType := k8s.Pods
scheme := "https"
method := "GET"
path := "/some/path"
@ -140,7 +140,7 @@ func TestRequestTapFromApi(t *testing.T) {
t.SkipNow()
authority := "localhost"
targetName := "pod-666"
resourceType := k8s.KubernetesPods
resourceType := k8s.Pods
scheme := "https"
method := "GET"
path := "/some/path"

View File

@ -1,5 +1,5 @@
## compile controller services
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
FROM gcr.io/runconduit/go-deps:ff0dc047 as golang
ARG CONDUIT_VERSION
WORKDIR /go/src/github.com/runconduit/conduit
COPY controller/gen controller/gen

View File

@ -11,27 +11,21 @@ import (
healthcheckPb "github.com/runconduit/conduit/controller/gen/common/healthcheck"
tapPb "github.com/runconduit/conduit/controller/gen/controller/tap"
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/controller/k8s"
pkgK8s "github.com/runconduit/conduit/pkg/k8s"
"github.com/runconduit/conduit/pkg/version"
log "github.com/sirupsen/logrus"
k8sV1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
applisters "k8s.io/client-go/listers/apps/v1beta2"
corelisters "k8s.io/client-go/listers/core/v1"
)
type (
grpcServer struct {
prometheusAPI promv1.API
tapClient tapPb.TapClient
namespaceLister corelisters.NamespaceLister
deployLister applisters.DeploymentLister
replicaSetLister applisters.ReplicaSetLister
podLister corelisters.PodLister
replicationControllerLister corelisters.ReplicationControllerLister
serviceLister corelisters.ServiceLister
controllerNamespace string
ignoredNamespaces []string
prometheusAPI promv1.API
tapClient tapPb.TapClient
lister *k8s.Lister
controllerNamespace string
ignoredNamespaces []string
}
)
@ -46,26 +40,16 @@ const (
func newGrpcServer(
promAPI promv1.API,
tapClient tapPb.TapClient,
namespaceLister corelisters.NamespaceLister,
deployLister applisters.DeploymentLister,
replicaSetLister applisters.ReplicaSetLister,
podLister corelisters.PodLister,
replicationControllerLister corelisters.ReplicationControllerLister,
serviceLister corelisters.ServiceLister,
lister *k8s.Lister,
controllerNamespace string,
ignoredNamespaces []string,
) *grpcServer {
return &grpcServer{
prometheusAPI: promAPI,
tapClient: tapClient,
namespaceLister: namespaceLister,
deployLister: deployLister,
replicaSetLister: replicaSetLister,
podLister: podLister,
replicationControllerLister: replicationControllerLister,
serviceLister: serviceLister,
controllerNamespace: controllerNamespace,
ignoredNamespaces: ignoredNamespaces,
prometheusAPI: promAPI,
tapClient: tapClient,
lister: lister,
controllerNamespace: controllerNamespace,
ignoredNamespaces: ignoredNamespaces,
}
}
@ -92,7 +76,7 @@ func (s *grpcServer) ListPods(ctx context.Context, req *pb.Empty) (*pb.ListPodsR
reports[pod] = time.Unix(0, int64(timestamp)*int64(time.Millisecond))
}
pods, err := s.podLister.List(labels.Everything())
pods, err := s.lister.Pod.List(labels.Everything())
if err != nil {
return nil, err
}
@ -151,7 +135,7 @@ func (s *grpcServer) SelfCheck(ctx context.Context, in *healthcheckPb.SelfCheckR
CheckDescription: K8sClientCheckDescription,
Status: healthcheckPb.CheckStatus_OK,
}
_, err := s.podLister.List(labels.Everything())
_, err := s.lister.Pod.List(labels.Everything())
if err != nil {
k8sClientCheck.Status = healthcheckPb.CheckStatus_ERROR
k8sClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error talking to Kubernetes from control plane: %s", err.Error())
@ -242,7 +226,7 @@ func (s *grpcServer) getDeploymentFor(pod *k8sV1.Pod) (string, error) {
return "", fmt.Errorf("Pod %s parent is not a ReplicaSet", pod.Name)
}
rs, err := s.replicaSetLister.GetPodReplicaSets(pod)
rs, err := s.lister.RS.GetPodReplicaSets(pod)
if err != nil {
return "", err
}

View File

@ -4,17 +4,15 @@ import (
"context"
"sort"
"testing"
"time"
"github.com/golang/protobuf/ptypes/duration"
"github.com/prometheus/common/model"
tap "github.com/runconduit/conduit/controller/gen/controller/tap"
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/controller/k8s"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
)
type listPodsExpected struct {
@ -164,38 +162,17 @@ spec:
}
clientSet := fake.NewSimpleClientset(k8sObjs...)
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
podInformer := sharedInformers.Core().V1().Pods()
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
serviceInformer := sharedInformers.Core().V1().Services()
lister := k8s.NewLister(clientSet)
fakeGrpcServer := newGrpcServer(
&MockProm{Res: exp.promRes},
tap.NewTapClient(nil),
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
lister,
"conduit",
[]string{},
)
stopCh := make(chan struct{})
sharedInformers.Start(stopCh)
if !cache.WaitForCacheSync(
stopCh,
namespaceInformer.Informer().HasSynced,
deployInformer.Informer().HasSynced,
replicaSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
replicationControllerInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
) {
err := lister.Sync()
if err != nil {
t.Fatalf("timed out wait for caches to sync")
}

View File

@ -11,11 +11,10 @@ import (
healthcheckPb "github.com/runconduit/conduit/controller/gen/common/healthcheck"
tapPb "github.com/runconduit/conduit/controller/gen/controller/tap"
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/util"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/metadata"
applisters "k8s.io/client-go/listers/apps/v1beta2"
corelisters "k8s.io/client-go/listers/core/v1"
)
var (
@ -221,12 +220,7 @@ func NewServer(
addr string,
prometheusClient promApi.Client,
tapClient tapPb.TapClient,
namespaceLister corelisters.NamespaceLister,
deployLister applisters.DeploymentLister,
replicaSetLister applisters.ReplicaSetLister,
podLister corelisters.PodLister,
replicationControllerLister corelisters.ReplicationControllerLister,
serviceLister corelisters.ServiceLister,
lister *k8s.Lister,
controllerNamespace string,
ignoredNamespaces []string,
) *http.Server {
@ -234,12 +228,7 @@ func NewServer(
grpcServer: newGrpcServer(
promv1.NewAPI(prometheusClient),
tapClient,
namespaceLister,
deployLister,
replicaSetLister,
podLister,
replicationControllerLister,
serviceLister,
lister,
controllerNamespace,
ignoredNamespaces,
),

View File

@ -8,13 +8,15 @@ import (
"time"
"github.com/prometheus/common/model"
"github.com/runconduit/conduit/controller/api/util"
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/pkg/k8s"
log "github.com/sirupsen/logrus"
appsv1beta2 "k8s.io/api/apps/v1beta2"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/tools/cache"
)
@ -35,26 +37,11 @@ const (
promLatencyP95 = promType("0.95")
promLatencyP99 = promType("0.99")
deploymentLabel = model.LabelName("deployment")
namespaceLabel = model.LabelName("namespace")
podLabel = model.LabelName("pod")
replicationControllerLabel = model.LabelName("replication_controller")
serviceLabel = model.LabelName("service")
namespaceLabel = model.LabelName("namespace")
dstNamespaceLabel = model.LabelName("dst_namespace")
)
var (
promTypes = []promType{promRequests, promLatencyP50, promLatencyP95, promLatencyP99}
k8sResourceTypesToPromLabels = map[string]model.LabelName{
k8s.KubernetesDeployments: deploymentLabel,
k8s.KubernetesNamespaces: namespaceLabel,
k8s.KubernetesPods: podLabel,
k8s.KubernetesReplicationControllers: replicationControllerLabel,
k8s.KubernetesServices: serviceLabel,
}
)
var promTypes = []promType{promRequests, promLatencyP50, promLatencyP95, promLatencyP99}
type meshedCount struct {
inMesh uint64
@ -62,42 +49,53 @@ type meshedCount struct {
}
func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest) (*pb.StatSummaryResponse, error) {
var err error
var objectMap map[string]metav1.ObjectMeta
var meshCount map[string]*meshedCount
// special case to check for services as outbound only
if req.Selector.Resource.Type == k8s.Services &&
req.Outbound.(*pb.StatSummaryRequest_FromResource) == nil {
return nil, status.Errorf(codes.InvalidArgument, "service only supported as a target on 'from' queries, or as a destination on 'to' queries.")
}
switch req.Selector.Resource.Type {
case k8s.KubernetesDeployments:
objectMap, meshCount, err = s.getDeployments(req.Selector.Resource)
case k8s.KubernetesNamespaces:
objectMap, meshCount, err = s.getNamespaces(req.Selector.Resource)
case k8s.KubernetesPods:
objectMap, meshCount, err = s.getPods(req.Selector.Resource)
case k8s.KubernetesReplicationControllers:
objectMap, meshCount, err = s.getReplicationControllers(req.Selector.Resource)
case k8s.KubernetesServices:
objects, err := s.lister.GetObjects(req.Selector.Resource.Namespace, req.Selector.Resource.Type, req.Selector.Resource.Name)
if err != nil {
return nil, util.GRPCError(err)
}
switch req.Outbound.(type) {
case *pb.StatSummaryRequest_FromResource:
objectMap, meshCount, err = s.getServices(req.Selector.Resource)
default:
err = fmt.Errorf("Service only supported as a target on 'from' queries, or as a destination on 'to' queries.")
// TODO: make these one struct:
// string => {metav1.ObjectMeta, meshedCount}
objectMap := map[string]metav1.Object{}
meshCountMap := map[string]*meshedCount{}
for _, object := range objects {
key, err := cache.MetaNamespaceKeyFunc(object)
if err != nil {
return nil, util.GRPCError(err)
}
metaObj, err := meta.Accessor(object)
if err != nil {
return nil, util.GRPCError(err)
}
default:
err = fmt.Errorf("Unimplemented resource type: %v", req.Selector.Resource.Type)
}
if err != nil {
return nil, err
objectMap[key] = metaObj
meshCount, err := s.getMeshedPodCount(object)
if err != nil {
return nil, util.GRPCError(err)
}
meshCountMap[key] = meshCount
}
return s.objectQuery(ctx, req, objectMap, meshCount)
res, err := s.objectQuery(ctx, req, objectMap, meshCountMap)
if err != nil {
return nil, util.GRPCError(err)
}
return res, nil
}
func (s *grpcServer) objectQuery(
ctx context.Context,
req *pb.StatSummaryRequest,
objects map[string]metav1.ObjectMeta,
objects map[string]metav1.Object,
meshCount map[string]*meshedCount,
) (*pb.StatSummaryResponse, error) {
rows := make([]*pb.StatTable_PodGroup_Row, 0)
@ -129,9 +127,9 @@ func (s *grpcServer) objectQuery(
row := pb.StatTable_PodGroup_Row{
Resource: &pb.Resource{
Namespace: resource.Namespace,
Namespace: resource.GetNamespace(),
Type: req.Selector.Resource.Type,
Name: resource.Name,
Name: resource.GetName(),
},
TimeWindow: req.TimeWindow,
Stats: requestMetrics[key],
@ -166,7 +164,7 @@ func (s *grpcServer) objectQuery(
func promLabelNames(resource *pb.Resource) model.LabelNames {
names := model.LabelNames{namespaceLabel}
if resource.Type != k8s.KubernetesNamespaces {
if resource.Type != k8s.Namespaces {
names = append(names, promResourceType(resource))
}
return names
@ -174,7 +172,7 @@ func promLabelNames(resource *pb.Resource) model.LabelNames {
func promDstLabelNames(resource *pb.Resource) model.LabelNames {
names := model.LabelNames{dstNamespaceLabel}
if resource.Type != k8s.KubernetesNamespaces {
if resource.Type != k8s.Namespaces {
names = append(names, "dst_"+promResourceType(resource))
}
return names
@ -185,7 +183,7 @@ func promLabels(resource *pb.Resource) model.LabelSet {
if resource.Name != "" {
set[promResourceType(resource)] = model.LabelValue(resource.Name)
}
if resource.Type != k8s.KubernetesNamespaces && resource.Namespace != "" {
if resource.Type != k8s.Namespaces && resource.Namespace != "" {
set[namespaceLabel] = model.LabelValue(resource.Namespace)
}
return set
@ -196,7 +194,7 @@ func promDstLabels(resource *pb.Resource) model.LabelSet {
if resource.Name != "" {
set["dst_"+promResourceType(resource)] = model.LabelValue(resource.Name)
}
if resource.Type != k8s.KubernetesNamespaces && resource.Namespace != "" {
if resource.Type != k8s.Namespaces && resource.Namespace != "" {
set[dstNamespaceLabel] = model.LabelValue(resource.Namespace)
}
return set
@ -209,7 +207,7 @@ func promDirectionLabels(direction string) model.LabelSet {
}
func promResourceType(resource *pb.Resource) model.LabelName {
return k8sResourceTypesToPromLabels[resource.Type]
return model.LabelName(k8s.ResourceTypesToProxyLabels[resource.Type])
}
func buildRequestLabels(req *pb.StatSummaryRequest) (model.LabelSet, model.LabelNames) {
@ -330,210 +328,14 @@ func metricToKey(metric model.Metric, groupBy model.LabelNames) string {
return strings.Join(values, "/")
}
func (s *grpcServer) getDeployments(res *pb.Resource) (map[string]metav1.ObjectMeta, map[string]*meshedCount, error) {
var err error
var deployments []*appsv1beta2.Deployment
if res.Namespace == "" {
deployments, err = s.deployLister.List(labels.Everything())
} else if res.Name == "" {
deployments, err = s.deployLister.Deployments(res.Namespace).List(labels.Everything())
} else {
var deployment *appsv1beta2.Deployment
deployment, err = s.deployLister.Deployments(res.Namespace).Get(res.Name)
deployments = []*appsv1beta2.Deployment{deployment}
}
if err != nil {
return nil, nil, err
}
meshedPodCount := make(map[string]*meshedCount)
deploymentMap := make(map[string]metav1.ObjectMeta)
for _, deployment := range deployments {
key, err := cache.MetaNamespaceKeyFunc(deployment)
if err != nil {
return nil, nil, err
}
deploymentMap[key] = deployment.ObjectMeta
meshCount, err := s.getMeshedPodCount(deployment.Namespace, deployment)
if err != nil {
return nil, nil, err
}
meshedPodCount[key] = meshCount
}
return deploymentMap, meshedPodCount, nil
}
func (s *grpcServer) getNamespaces(res *pb.Resource) (map[string]metav1.ObjectMeta, map[string]*meshedCount, error) {
var err error
var namespaces []*apiv1.Namespace
if res.Name == "" {
namespaces, err = s.namespaceLister.List(labels.Everything())
} else {
var namespace *apiv1.Namespace
namespace, err = s.namespaceLister.Get(res.Name)
namespaces = []*apiv1.Namespace{namespace}
}
if err != nil {
return nil, nil, err
}
meshedPodCount := make(map[string]*meshedCount)
namespaceMap := make(map[string]metav1.ObjectMeta)
for _, namespace := range namespaces {
key, err := cache.MetaNamespaceKeyFunc(namespace)
if err != nil {
return nil, nil, err
}
namespaceMap[key] = namespace.ObjectMeta
meshCount, err := s.getMeshedPodCount(namespace.Name, namespace)
if err != nil {
return nil, nil, err
}
meshedPodCount[key] = meshCount
}
return namespaceMap, meshedPodCount, nil
}
func (s *grpcServer) getPods(res *pb.Resource) (map[string]metav1.ObjectMeta, map[string]*meshedCount, error) {
var err error
var pods []*apiv1.Pod
if res.Namespace == "" {
pods, err = s.podLister.List(labels.Everything())
} else if res.Name == "" {
pods, err = s.podLister.Pods(res.Namespace).List(labels.Everything())
} else {
var pod *apiv1.Pod
pod, err = s.podLister.Pods(res.Namespace).Get(res.Name)
pods = []*apiv1.Pod{pod}
}
if err != nil {
return nil, nil, err
}
meshedPodCount := make(map[string]*meshedCount)
podMap := make(map[string]metav1.ObjectMeta)
for _, pod := range pods {
if !isPendingOrRunning(pod) {
continue
}
key, err := cache.MetaNamespaceKeyFunc(pod)
if err != nil {
return nil, nil, err
}
podMap[key] = pod.ObjectMeta
meshCount := &meshedCount{total: 1}
if isInMesh(pod) {
meshCount.inMesh++
}
meshedPodCount[key] = meshCount
}
return podMap, meshedPodCount, nil
}
func (s *grpcServer) getReplicationControllers(res *pb.Resource) (map[string]metav1.ObjectMeta, map[string]*meshedCount, error) {
var err error
var rcs []*apiv1.ReplicationController
if res.Namespace == "" {
rcs, err = s.replicationControllerLister.List(labels.Everything())
} else if res.Name == "" {
rcs, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).List(labels.Everything())
} else {
var rc *apiv1.ReplicationController
rc, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).Get(res.Name)
rcs = []*apiv1.ReplicationController{rc}
}
if err != nil {
return nil, nil, err
}
meshedPodCount := make(map[string]*meshedCount)
rcMap := make(map[string]metav1.ObjectMeta)
for _, rc := range rcs {
key, err := cache.MetaNamespaceKeyFunc(rc)
if err != nil {
return nil, nil, err
}
rcMap[key] = rc.ObjectMeta
meshCount, err := s.getMeshedPodCount(rc.Namespace, rc)
if err != nil {
return nil, nil, err
}
meshedPodCount[key] = meshCount
}
return rcMap, meshedPodCount, nil
}
func (s *grpcServer) getServices(res *pb.Resource) (map[string]metav1.ObjectMeta, map[string]*meshedCount, error) {
var err error
var services []*apiv1.Service
if res.Namespace == "" {
services, err = s.serviceLister.List(labels.Everything())
} else if res.Name == "" {
services, err = s.serviceLister.Services(res.Namespace).List(labels.Everything())
} else {
var svc *apiv1.Service
svc, err = s.serviceLister.Services(res.Namespace).Get(res.Name)
services = []*apiv1.Service{svc}
}
if err != nil {
return nil, nil, err
}
meshedPodCount := make(map[string]*meshedCount)
svcMap := make(map[string]metav1.ObjectMeta)
for _, svc := range services {
key, err := cache.MetaNamespaceKeyFunc(svc)
if err != nil {
return nil, nil, err
}
svcMap[key] = svc.ObjectMeta
meshCount, err := s.getMeshedPodCount(svc.Namespace, svc)
if err != nil {
return nil, nil, err
}
meshedPodCount[key] = meshCount
}
return svcMap, meshedPodCount, nil
}
func (s *grpcServer) getMeshedPodCount(namespace string, obj runtime.Object) (*meshedCount, error) {
selector, err := getSelectorFromObject(obj)
if err != nil {
return nil, err
}
pods, err := s.podLister.Pods(namespace).List(selector)
func (s *grpcServer) getMeshedPodCount(obj runtime.Object) (*meshedCount, error) {
pods, err := s.lister.GetPodsFor(obj)
if err != nil {
return nil, err
}
meshCount := &meshedCount{}
for _, pod := range pods {
if !isPendingOrRunning(pod) {
continue
}
meshCount.total++
if isInMesh(pod) {
meshCount.inMesh++
@ -548,32 +350,6 @@ func isInMesh(pod *apiv1.Pod) bool {
return ok
}
func isPendingOrRunning(pod *apiv1.Pod) bool {
pending := pod.Status.Phase == apiv1.PodPending
running := pod.Status.Phase == apiv1.PodRunning
terminating := pod.DeletionTimestamp != nil
return (pending || running) && !terminating
}
func getSelectorFromObject(obj runtime.Object) (labels.Selector, error) {
switch typed := obj.(type) {
case *apiv1.Namespace:
return labels.Everything(), nil
case *appsv1beta2.Deployment:
return labels.Set(typed.Spec.Selector.MatchLabels).AsSelector(), nil
case *apiv1.ReplicationController:
return labels.Set(typed.Spec.Selector).AsSelector(), nil
case *apiv1.Service:
return labels.Set(typed.Spec.Selector).AsSelector(), nil
default:
return nil, fmt.Errorf("Cannot get object selector: %v", obj)
}
}
func (s *grpcServer) queryProm(ctx context.Context, query string) (model.Vector, error) {
log.Debugf("Query request: %+v", query)

View File

@ -5,17 +5,15 @@ import (
"errors"
"reflect"
"testing"
"time"
"github.com/prometheus/common/model"
tap "github.com/runconduit/conduit/controller/gen/controller/tap"
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/pkg/k8s"
"github.com/runconduit/conduit/controller/k8s"
pkgK8s "github.com/runconduit/conduit/pkg/k8s"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
)
type statSumExpected struct {
@ -97,7 +95,7 @@ status:
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
Namespace: "emojivoto",
Type: k8s.KubernetesDeployments,
Type: pkgK8s.Deployments,
},
},
TimeWindow: "1m",
@ -150,38 +148,17 @@ status:
}
clientSet := fake.NewSimpleClientset(k8sObjs...)
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
podInformer := sharedInformers.Core().V1().Pods()
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
serviceInformer := sharedInformers.Core().V1().Services()
lister := k8s.NewLister(clientSet)
fakeGrpcServer := newGrpcServer(
&MockProm{Res: exp.promRes},
tap.NewTapClient(nil),
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
lister,
"conduit",
[]string{},
)
stopCh := make(chan struct{})
sharedInformers.Start(stopCh)
if !cache.WaitForCacheSync(
stopCh,
namespaceInformer.Informer().HasSynced,
deployInformer.Informer().HasSynced,
replicaSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
replicationControllerInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
) {
err := lister.Sync()
if err != nil {
t.Fatalf("timed out wait for caches to sync")
}
@ -199,7 +176,7 @@ status:
t.Run("Given an invalid resource type, returns error", func(t *testing.T) {
expectations := []statSumExpected{
statSumExpected{
err: errors.New("Unimplemented resource type: badtype"),
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: badtype"),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -209,7 +186,7 @@ status:
},
},
statSumExpected{
err: errors.New("Unimplemented resource type: deployment"),
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: deployment"),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -219,7 +196,7 @@ status:
},
},
statSumExpected{
err: errors.New("Unimplemented resource type: pod"),
err: errors.New("rpc error: code = Unimplemented desc = unimplemented resource type: pod"),
req: pb.StatSummaryRequest{
Selector: &pb.ResourceSelection{
Resource: &pb.Resource{
@ -232,16 +209,11 @@ status:
for _, exp := range expectations {
clientSet := fake.NewSimpleClientset()
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
lister := k8s.NewLister(clientSet)
fakeGrpcServer := newGrpcServer(
&MockProm{Res: exp.promRes},
tap.NewTapClient(nil),
sharedInformers.Core().V1().Namespaces().Lister(),
sharedInformers.Apps().V1beta2().Deployments().Lister(),
sharedInformers.Apps().V1beta2().ReplicaSets().Lister(),
sharedInformers.Core().V1().Pods().Lister(),
sharedInformers.Core().V1().ReplicationControllers().Lister(),
sharedInformers.Core().V1().Services().Lister(),
lister,
"conduit",
[]string{},
)

View File

@ -7,6 +7,10 @@ import (
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/pkg/k8s"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
/*
@ -21,21 +25,21 @@ var (
// target resource on an outbound 'to' query
// destination resource on an outbound 'from' query
ValidTargets = []string{
k8s.KubernetesDeployments,
k8s.KubernetesNamespaces,
k8s.KubernetesPods,
k8s.KubernetesReplicationControllers,
k8s.Deployments,
k8s.Namespaces,
k8s.Pods,
k8s.ReplicationControllers,
}
// ValidDestinations specifies resource types allowed as a destination:
// destination resource on an outbound 'to' query
// target resource on an outbound 'from' query
ValidDestinations = []string{
k8s.KubernetesDeployments,
k8s.KubernetesNamespaces,
k8s.KubernetesPods,
k8s.KubernetesReplicationControllers,
k8s.KubernetesServices,
k8s.Deployments,
k8s.Namespaces,
k8s.Pods,
k8s.ReplicationControllers,
k8s.Services,
}
)
@ -52,6 +56,38 @@ type StatSummaryRequestParams struct {
FromName string
}
// GRPCError generates a gRPC error code, as defined in
// google.golang.org/grpc/status.
// If the error is nil or already a gRPC error, return the error.
// If the error is of type k8s.io/apimachinery/pkg/apis/meta/v1#StatusReason,
// attempt to map the reason to a gRPC error.
func GRPCError(err error) error {
if err != nil && status.Code(err) == codes.Unknown {
code := codes.Internal
switch k8sErrors.ReasonForError(err) {
case metav1.StatusReasonUnknown:
code = codes.Unknown
case metav1.StatusReasonUnauthorized, metav1.StatusReasonForbidden:
code = codes.PermissionDenied
case metav1.StatusReasonNotFound:
code = codes.NotFound
case metav1.StatusReasonAlreadyExists:
code = codes.AlreadyExists
case metav1.StatusReasonInvalid:
code = codes.InvalidArgument
case metav1.StatusReasonExpired:
code = codes.DeadlineExceeded
case metav1.StatusReasonServiceUnavailable:
code = codes.Unavailable
}
err = status.Error(code, err.Error())
}
return err
}
func BuildStatSummaryRequest(p StatSummaryRequestParams) (*pb.StatSummaryRequest, error) {
window := defaultMetricTimeWindow
if p.TimeWindow != "" {
@ -158,7 +194,7 @@ func buildResource(namespace string, resType string, name string) (pb.Resource,
if err != nil {
return pb.Resource{}, err
}
if canonicalType == k8s.KubernetesNamespaces {
if canonicalType == k8s.Namespaces {
// ignore --namespace flags if type is namespace
namespace = ""
}

View File

@ -1,28 +1,51 @@
package util
import (
"errors"
"reflect"
"testing"
pb "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/pkg/k8s"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
k8sError "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime/schema"
)
type resourceExp struct {
namespace string
args []string
resource pb.Resource
func TestGRPCError(t *testing.T) {
t.Run("Maps errors to gRPC errors", func(t *testing.T) {
expectations := map[error]error{
nil: nil,
errors.New("normal erro"): errors.New("rpc error: code = Unknown desc = normal erro"),
status.Error(codes.NotFound, "grpc not found"): errors.New("rpc error: code = NotFound desc = grpc not found"),
k8sError.NewNotFound(schema.GroupResource{Group: "foo", Resource: "bar"}, "http not found"): errors.New("rpc error: code = NotFound desc = bar.foo \"http not found\" not found"),
k8sError.NewServiceUnavailable("unavailable"): errors.New("rpc error: code = Unavailable desc = unavailable"),
k8sError.NewGone("gone"): errors.New("rpc error: code = Internal desc = gone"),
}
for in, out := range expectations {
err := GRPCError(in)
if err != nil || out != nil {
if (err == nil && out != nil) ||
(err != nil && out == nil) ||
(err.Error() != out.Error()) {
t.Fatalf("Expected GRPCError to return [%s], got: [%s]", out, GRPCError(in))
}
}
}
})
}
func TestBuildStatSummaryRequest(t *testing.T) {
t.Run("Maps Kubernetes friendly names to canonical names", func(t *testing.T) {
expectations := map[string]string{
"deployments": k8s.KubernetesDeployments,
"deployment": k8s.KubernetesDeployments,
"deploy": k8s.KubernetesDeployments,
"pods": k8s.KubernetesPods,
"pod": k8s.KubernetesPods,
"po": k8s.KubernetesPods,
"deployments": k8s.Deployments,
"deployment": k8s.Deployments,
"deploy": k8s.Deployments,
"pods": k8s.Pods,
"pod": k8s.Pods,
"po": k8s.Pods,
}
for friendly, canonical := range expectations {
@ -51,7 +74,7 @@ func TestBuildStatSummaryRequest(t *testing.T) {
statSummaryRequest, err := BuildStatSummaryRequest(
StatSummaryRequestParams{
TimeWindow: timeWindow,
ResourceType: k8s.KubernetesDeployments,
ResourceType: k8s.Deployments,
},
)
if err != nil {
@ -107,6 +130,12 @@ func TestBuildStatSummaryRequest(t *testing.T) {
}
func TestBuildResource(t *testing.T) {
type resourceExp struct {
namespace string
args []string
resource pb.Resource
}
t.Run("Correctly parses Kubernetes resources from the command line", func(t *testing.T) {
expectations := []resourceExp{
resourceExp{
@ -114,7 +143,7 @@ func TestBuildResource(t *testing.T) {
args: []string{"deployments"},
resource: pb.Resource{
Namespace: "test-ns",
Type: k8s.KubernetesDeployments,
Type: k8s.Deployments,
Name: "",
},
},
@ -123,7 +152,7 @@ func TestBuildResource(t *testing.T) {
args: []string{"deploy/foo"},
resource: pb.Resource{
Namespace: "",
Type: k8s.KubernetesDeployments,
Type: k8s.Deployments,
Name: "foo",
},
},
@ -132,7 +161,7 @@ func TestBuildResource(t *testing.T) {
args: []string{"po", "foo"},
resource: pb.Resource{
Namespace: "foo-ns",
Type: k8s.KubernetesPods,
Type: k8s.Pods,
Name: "foo",
},
},
@ -141,7 +170,7 @@ func TestBuildResource(t *testing.T) {
args: []string{"ns", "foo-ns2"},
resource: pb.Resource{
Namespace: "",
Type: k8s.KubernetesNamespaces,
Type: k8s.Namespaces,
Name: "foo-ns2",
},
},
@ -150,7 +179,7 @@ func TestBuildResource(t *testing.T) {
args: []string{"ns/foo-ns2"},
resource: pb.Resource{
Namespace: "",
Type: k8s.KubernetesNamespaces,
Type: k8s.Namespaces,
Name: "foo-ns2",
},
},

View File

@ -7,7 +7,6 @@ import (
"os/signal"
"strings"
"syscall"
"time"
promApi "github.com/prometheus/client_golang/api"
"github.com/runconduit/conduit/controller/api/public"
@ -16,8 +15,6 @@ import (
"github.com/runconduit/conduit/controller/util"
"github.com/runconduit/conduit/pkg/version"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func main() {
@ -55,27 +52,7 @@ func main() {
log.Fatal(err.Error())
}
sharedInformers := informers.NewSharedInformerFactory(k8sClient, 10*time.Minute)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
namespaceInformerSynced := namespaceInformer.Informer().HasSynced
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
deployInformerSynced := deployInformer.Informer().HasSynced
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
replicaSetInformerSynced := replicaSetInformer.Informer().HasSynced
podInformer := sharedInformers.Core().V1().Pods()
podInformerSynced := podInformer.Informer().HasSynced
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
replicationControllerInformerSynced := replicationControllerInformer.Informer().HasSynced
serviceInformer := sharedInformers.Core().V1().Services()
serviceInformerSynced := serviceInformer.Informer().HasSynced
sharedInformers.Start(nil)
lister := k8s.NewLister(k8sClient)
prometheusClient, err := promApi.NewClient(promApi.Config{Address: *prometheusUrl})
if err != nil {
@ -86,33 +63,16 @@ func main() {
*addr,
prometheusClient,
tapClient,
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
lister,
*controllerNamespace,
strings.Split(*ignoredNamespaces, ","),
)
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
log.Infof("waiting for caches to sync")
if !cache.WaitForCacheSync(
ctx.Done(),
namespaceInformerSynced,
deployInformerSynced,
replicaSetInformerSynced,
podInformerSynced,
replicationControllerInformerSynced,
serviceInformerSynced,
) {
log.Fatalf("timed out wait for caches to sync")
err := lister.Sync()
if err != nil {
log.Fatalf("timed out wait for caches to sync: %s", err)
}
log.Infof("caches synced")
}()
go func() {

View File

@ -1,13 +1,11 @@
package main
import (
"context"
"flag"
"fmt"
"os"
"os/signal"
"syscall"
"time"
"github.com/runconduit/conduit/controller/k8s"
"github.com/runconduit/conduit/controller/tap"
@ -15,8 +13,6 @@ import (
"github.com/runconduit/conduit/pkg/version"
log "github.com/sirupsen/logrus"
"k8s.io/api/core/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
)
func main() {
@ -77,59 +73,18 @@ func main() {
log.Fatalf("pods.Run() failed: %s", err)
}
// TODO: factor out with public-api
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
lister := k8s.NewLister(clientSet)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
namespaceInformerSynced := namespaceInformer.Informer().HasSynced
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
deployInformerSynced := deployInformer.Informer().HasSynced
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
replicaSetInformerSynced := replicaSetInformer.Informer().HasSynced
podInformer := sharedInformers.Core().V1().Pods()
podInformerSynced := podInformer.Informer().HasSynced
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
replicationControllerInformerSynced := replicationControllerInformer.Informer().HasSynced
serviceInformer := sharedInformers.Core().V1().Services()
serviceInformerSynced := serviceInformer.Informer().HasSynced
sharedInformers.Start(nil)
server, lis, err := tap.NewServer(
*addr, *tapPort, replicaSets, pods,
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
)
server, lis, err := tap.NewServer(*addr, *tapPort, replicaSets, pods, lister)
if err != nil {
log.Fatal(err.Error())
}
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
log.Infof("waiting for caches to sync")
if !cache.WaitForCacheSync(
ctx.Done(),
namespaceInformerSynced,
deployInformerSynced,
replicaSetInformerSynced,
podInformerSynced,
replicationControllerInformerSynced,
serviceInformerSynced,
) {
log.Fatalf("timed out wait for caches to sync")
err := lister.Sync()
if err != nil {
log.Fatalf("timed out wait for caches to sync: %s", err)
}
log.Infof("caches synced")
}()
go func() {

View File

@ -6,7 +6,10 @@ import (
"fmt"
"time"
"github.com/runconduit/conduit/pkg/k8s"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
appsv1beta2 "k8s.io/api/apps/v1beta2"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
@ -92,6 +95,27 @@ func (l *Lister) Sync() error {
return nil
}
// GetObjects returns a list of Kubernetes objects, given a namespace, type, and name.
// If namespace is an empty string, match objects in all namespaces.
// If name is an empty string, match all objects of the given type.
func (l *Lister) GetObjects(namespace, restype, name string) ([]runtime.Object, error) {
switch restype {
case k8s.Namespaces:
return l.getNamespaces(name)
case k8s.Deployments:
return l.getDeployments(namespace, name)
case k8s.Pods:
return l.getPods(namespace, name)
case k8s.ReplicationControllers:
return l.getRCs(namespace, name)
case k8s.Services:
return l.getServices(namespace, name)
default:
// TODO: ReplicaSet
return nil, status.Errorf(codes.Unimplemented, "unimplemented resource type: %s", restype)
}
}
// GetPodsFor returns all running and pending Pods associated with a given
// Kubernetes object.
func (l *Lister) GetPodsFor(obj runtime.Object) ([]*apiv1.Pod, error) {
@ -142,6 +166,134 @@ func (l *Lister) GetPodsFor(obj runtime.Object) ([]*apiv1.Pod, error) {
return allPods, nil
}
func (l *Lister) getNamespaces(name string) ([]runtime.Object, error) {
var err error
var namespaces []*apiv1.Namespace
if name == "" {
namespaces, err = l.NS.List(labels.Everything())
} else {
var namespace *apiv1.Namespace
namespace, err = l.NS.Get(name)
namespaces = []*apiv1.Namespace{namespace}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, ns := range namespaces {
objects = append(objects, ns)
}
return objects, nil
}
func (l *Lister) getDeployments(namespace, name string) ([]runtime.Object, error) {
var err error
var deploys []*appsv1beta2.Deployment
if namespace == "" {
deploys, err = l.Deploy.List(labels.Everything())
} else if name == "" {
deploys, err = l.Deploy.Deployments(namespace).List(labels.Everything())
} else {
var deploy *appsv1beta2.Deployment
deploy, err = l.Deploy.Deployments(namespace).Get(name)
deploys = []*appsv1beta2.Deployment{deploy}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, deploy := range deploys {
objects = append(objects, deploy)
}
return objects, nil
}
func (l *Lister) getPods(namespace, name string) ([]runtime.Object, error) {
var err error
var pods []*apiv1.Pod
if namespace == "" {
pods, err = l.Pod.List(labels.Everything())
} else if name == "" {
pods, err = l.Pod.Pods(namespace).List(labels.Everything())
} else {
var pod *apiv1.Pod
pod, err = l.Pod.Pods(namespace).Get(name)
pods = []*apiv1.Pod{pod}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, pod := range pods {
objects = append(objects, pod)
}
return objects, nil
}
func (l *Lister) getRCs(namespace, name string) ([]runtime.Object, error) {
var err error
var rcs []*apiv1.ReplicationController
if namespace == "" {
rcs, err = l.RC.List(labels.Everything())
} else if name == "" {
rcs, err = l.RC.ReplicationControllers(namespace).List(labels.Everything())
} else {
var rc *apiv1.ReplicationController
rc, err = l.RC.ReplicationControllers(namespace).Get(name)
rcs = []*apiv1.ReplicationController{rc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, rc := range rcs {
objects = append(objects, rc)
}
return objects, nil
}
func (l *Lister) getServices(namespace, name string) ([]runtime.Object, error) {
var err error
var services []*apiv1.Service
if namespace == "" {
services, err = l.Svc.List(labels.Everything())
} else if name == "" {
services, err = l.Svc.Services(namespace).List(labels.Everything())
} else {
var svc *apiv1.Service
svc, err = l.Svc.Services(namespace).Get(name)
services = []*apiv1.Service{svc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, svc := range services {
objects = append(objects, svc)
}
return objects, nil
}
func isPendingOrRunning(pod *apiv1.Pod) bool {
pending := pod.Status.Phase == apiv1.PodPending
running := pod.Status.Phase == apiv1.PodRunning

View File

@ -1,28 +1,168 @@
package k8s
import (
"errors"
"reflect"
"testing"
"github.com/runconduit/conduit/pkg/k8s"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
apiv1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
)
type listerExpected struct {
err error
func TestGetObjects(t *testing.T) {
// all 3 of these are used to seed the k8s client
k8sResInput string // object used as input to GetPodFor()
k8sResResults []string // expected results from GetPodFor
k8sResMisc []string // additional k8s objects for seeding the k8s client
type getObjectsExpected struct {
err error
// input
namespace string
resType string
name string
// these are used to seed the k8s client
k8sResResults []string // expected results from GetObjects
k8sResMisc []string // additional k8s objects for seeding the k8s client
}
t.Run("Returns expected objects based on input", func(t *testing.T) {
expectations := []getObjectsExpected{
getObjectsExpected{
err: status.Errorf(codes.Unimplemented, "unimplemented resource type: bar"),
namespace: "foo",
resType: "bar",
name: "baz",
k8sResResults: []string{},
k8sResMisc: []string{},
},
getObjectsExpected{
err: nil,
namespace: "my-ns",
resType: k8s.Pods,
name: "my-pod",
k8sResResults: []string{`
apiVersion: v1
kind: Pod
metadata:
name: my-pod
namespace: my-ns`,
},
k8sResMisc: []string{},
},
getObjectsExpected{
err: errors.New("pod \"my-pod\" not found"),
namespace: "not-my-ns",
resType: k8s.Pods,
name: "my-pod",
k8sResResults: []string{},
k8sResMisc: []string{`
apiVersion: v1
kind: Pod
metadata:
name: my-pod
namespace: my-ns`,
},
},
getObjectsExpected{
err: nil,
namespace: "",
resType: k8s.ReplicationControllers,
name: "",
k8sResResults: []string{`
apiVersion: v1
kind: ReplicationController
metadata:
name: my-rc
namespace: my-ns`,
},
k8sResMisc: []string{},
},
getObjectsExpected{
err: nil,
namespace: "my-ns",
resType: k8s.Deployments,
name: "",
k8sResResults: []string{`
apiVersion: apps/v1beta2
kind: Deployment
metadata:
name: my-deploy
namespace: my-ns`,
},
k8sResMisc: []string{`
apiVersion: apps/v1beta2
kind: Deployment
metadata:
name: my-deploy
namespace: not-my-ns`,
},
},
}
for _, exp := range expectations {
k8sObjs := []runtime.Object{}
k8sResults := []runtime.Object{}
for _, res := range exp.k8sResResults {
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode([]byte(res), nil, nil)
if err != nil {
t.Fatalf("could not decode yml: %s", err)
}
k8sObjs = append(k8sObjs, obj)
k8sResults = append(k8sResults, obj)
}
for _, res := range exp.k8sResMisc {
decode := scheme.Codecs.UniversalDeserializer().Decode
obj, _, err := decode([]byte(res), nil, nil)
if err != nil {
t.Fatalf("could not decode yml: %s", err)
}
k8sObjs = append(k8sObjs, obj)
}
clientSet := fake.NewSimpleClientset(k8sObjs...)
lister := NewLister(clientSet)
err := lister.Sync()
if err != nil {
t.Fatalf("lister.Sync() returned an error: %s", err)
}
pods, err := lister.GetObjects(exp.namespace, exp.resType, exp.name)
if err != nil || exp.err != nil {
if (err == nil && exp.err != nil) ||
(err != nil && exp.err == nil) ||
(err.Error() != exp.err.Error()) {
t.Fatalf("lister.GetObjects() unexpected error, expected [%s] got: [%s]", exp.err, err)
}
} else {
if !reflect.DeepEqual(pods, k8sResults) {
t.Fatalf("Expected: %+v, Got: %+v", k8sResults, pods)
}
}
}
})
}
func TestGetPodsFor(t *testing.T) {
type getPodsForExpected struct {
err error
// all 3 of these are used to seed the k8s client
k8sResInput string // object used as input to GetPodFor()
k8sResResults []string // expected results from GetPodFor
k8sResMisc []string // additional k8s objects for seeding the k8s client
}
t.Run("Returns expected pods based on input", func(t *testing.T) {
expectations := []listerExpected{
listerExpected{
expectations := []getPodsForExpected{
getPodsForExpected{
err: nil,
k8sResInput: `
apiVersion: apps/v1beta2
@ -47,7 +187,7 @@ status:
phase: Finished`,
},
},
listerExpected{
getPodsForExpected{
err: nil,
k8sResInput: `
apiVersion: apps/v1beta2

View File

@ -8,6 +8,7 @@ import (
"strings"
"time"
apiUtil "github.com/runconduit/conduit/controller/api/util"
common "github.com/runconduit/conduit/controller/gen/common"
pb "github.com/runconduit/conduit/controller/gen/controller/tap"
proxy "github.com/runconduit/conduit/controller/gen/proxy/tap"
@ -19,14 +20,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
appsv1beta2 "k8s.io/api/apps/v1beta2"
apiv1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metaV1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
applisters "k8s.io/client-go/listers/apps/v1beta2"
corelisters "k8s.io/client-go/listers/core/v1"
)
type (
@ -37,27 +31,12 @@ type (
replicaSets *k8s.ReplicaSetStore
pods k8s.PodIndex
// TODO: factor out with public-api
namespaceLister corelisters.NamespaceLister
deployLister applisters.DeploymentLister
replicaSetLister applisters.ReplicaSetLister
podLister corelisters.PodLister
replicationControllerLister corelisters.ReplicationControllerLister
serviceLister corelisters.ServiceLister
lister *k8s.Lister
}
)
var (
tapInterval = 10 * time.Second
// TODO: factor out with public-api
k8sResourceTypesToDestinationLabels = map[string]string{
pkgK8s.KubernetesDeployments: "deployment",
pkgK8s.KubernetesNamespaces: "namespace",
pkgK8s.KubernetesPods: "pod",
pkgK8s.KubernetesReplicationControllers: "replication_controller",
pkgK8s.KubernetesServices: "service",
}
)
func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
@ -71,7 +50,7 @@ func (s *server) Tap(req *public.TapRequest, stream pb.Tap_TapServer) error {
targetName = target.Pod
pod, err := s.pods.GetPod(target.Pod)
if err != nil {
return status.Errorf(codes.NotFound, err.Error())
return apiUtil.GRPCError(err)
}
pods = []*apiv1.Pod{pod}
case *public.TapRequest_Deployment:
@ -125,16 +104,19 @@ func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_T
return status.Errorf(codes.InvalidArgument, "TapByResource received nil target ResourceSelection: %+v", *req)
}
pods, err := s.getPodsFor(*req.Target)
objects, err := s.lister.GetObjects(req.Target.Resource.Namespace, req.Target.Resource.Type, req.Target.Resource.Name)
if err != nil {
if status.Code(err) == codes.Unknown {
if k8sErrors.ReasonForError(err) == metaV1.StatusReasonNotFound {
err = status.Errorf(codes.NotFound, err.Error())
} else {
err = status.Errorf(codes.Internal, err.Error())
}
return apiUtil.GRPCError(err)
}
pods := []*apiv1.Pod{}
for _, object := range objects {
podsFor, err := s.lister.GetPodsFor(object)
if err != nil {
return apiUtil.GRPCError(err)
}
return err
pods = append(pods, podsFor...)
}
if len(pods) == 0 {
@ -158,10 +140,7 @@ func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_T
match, err := makeByResourceMatch(req.Match)
if err != nil {
if status.Code(err) == codes.Unknown {
err = status.Errorf(codes.Internal, err.Error())
}
return err
return apiUtil.GRPCError(err)
}
for _, pod := range pods {
@ -173,10 +152,7 @@ func (s *server) TapByResource(req *public.TapByResourceRequest, stream pb.Tap_T
for event := range events {
err := stream.Send(event)
if err != nil {
if status.Code(err) == codes.Unknown {
err = status.Errorf(codes.Internal, err.Error())
}
return err
return apiUtil.GRPCError(err)
}
}
return nil
@ -453,9 +429,9 @@ func makeByResourceMatch(match *public.TapByResourceRequest_Match) (*proxy.Obser
func destinationLabels(resource *public.Resource) map[string]string {
dstLabels := map[string]string{}
if resource.Name != "" {
dstLabels[k8sResourceTypesToDestinationLabels[resource.Type]] = resource.Name
dstLabels[pkgK8s.ResourceTypesToProxyLabels[resource.Type]] = resource.Name
}
if resource.Type != pkgK8s.KubernetesNamespaces && resource.Namespace != "" {
if resource.Type != pkgK8s.Namespaces && resource.Namespace != "" {
dstLabels["namespace"] = resource.Namespace
}
return dstLabels
@ -509,228 +485,13 @@ func (s *server) tapProxy(ctx context.Context, maxRps float32, match *proxy.Obse
}
}
//
// TODO: factor all these functions out of public-api into a shared k8s lister/resource module
//
func (s *server) getPodsFor(res public.ResourceSelection) ([]*apiv1.Pod, error) {
var err error
namespace := res.Resource.Namespace
objects := []runtime.Object{}
switch res.Resource.Type {
case pkgK8s.KubernetesDeployments:
objects, err = s.getDeployments(res.Resource)
case pkgK8s.KubernetesNamespaces:
namespace = res.Resource.Name // special case for namespace
objects, err = s.getNamespaces(res.Resource)
case pkgK8s.KubernetesReplicationControllers:
objects, err = s.getReplicationControllers(res.Resource)
case pkgK8s.KubernetesServices:
objects, err = s.getServices(res.Resource)
// special case for pods
case pkgK8s.KubernetesPods:
return s.getPods(res.Resource)
default:
err = status.Errorf(codes.Unimplemented, "unimplemented resource type: %v", res.Resource.Type)
}
if err != nil {
return nil, err
}
allPods := []*apiv1.Pod{}
for _, obj := range objects {
selector, err := getSelectorFromObject(obj)
if err != nil {
return nil, err
}
// TODO: special case namespace
pods, err := s.podLister.Pods(namespace).List(selector)
if err != nil {
return nil, err
}
for _, pod := range pods {
if isPendingOrRunning(pod) {
allPods = append(allPods, pod)
}
}
}
return allPods, nil
}
func isPendingOrRunning(pod *apiv1.Pod) bool {
pending := pod.Status.Phase == apiv1.PodPending
running := pod.Status.Phase == apiv1.PodRunning
terminating := pod.DeletionTimestamp != nil
return (pending || running) && !terminating
}
func getSelectorFromObject(obj runtime.Object) (labels.Selector, error) {
switch typed := obj.(type) {
case *apiv1.Namespace:
return labels.Everything(), nil
case *appsv1beta2.Deployment:
return labels.Set(typed.Spec.Selector.MatchLabels).AsSelector(), nil
case *apiv1.ReplicationController:
return labels.Set(typed.Spec.Selector).AsSelector(), nil
case *apiv1.Service:
return labels.Set(typed.Spec.Selector).AsSelector(), nil
default:
return nil, status.Errorf(codes.Unimplemented, "cannot get object selector: %v", obj)
}
}
func (s *server) getDeployments(res *public.Resource) ([]runtime.Object, error) {
var err error
var deployments []*appsv1beta2.Deployment
if res.Namespace == "" {
deployments, err = s.deployLister.List(labels.Everything())
} else if res.Name == "" {
deployments, err = s.deployLister.Deployments(res.Namespace).List(labels.Everything())
} else {
var deployment *appsv1beta2.Deployment
deployment, err = s.deployLister.Deployments(res.Namespace).Get(res.Name)
deployments = []*appsv1beta2.Deployment{deployment}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, deploy := range deployments {
objects = append(objects, deploy)
}
return objects, nil
}
func (s *server) getNamespaces(res *public.Resource) ([]runtime.Object, error) {
var err error
var namespaces []*apiv1.Namespace
if res.Name == "" {
namespaces, err = s.namespaceLister.List(labels.Everything())
} else {
var namespace *apiv1.Namespace
namespace, err = s.namespaceLister.Get(res.Name)
namespaces = []*apiv1.Namespace{namespace}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, ns := range namespaces {
objects = append(objects, ns)
}
return objects, nil
}
func (s *server) getPods(res *public.Resource) ([]*apiv1.Pod, error) {
var err error
var pods []*apiv1.Pod
if res.Namespace == "" {
pods, err = s.podLister.List(labels.Everything())
} else if res.Name == "" {
pods, err = s.podLister.Pods(res.Namespace).List(labels.Everything())
} else {
var pod *apiv1.Pod
pod, err = s.podLister.Pods(res.Namespace).Get(res.Name)
pods = []*apiv1.Pod{pod}
}
if err != nil {
return nil, err
}
var runningPods []*apiv1.Pod
for _, pod := range pods {
if isPendingOrRunning(pod) {
runningPods = append(runningPods, pod)
}
}
return runningPods, nil
}
func (s *server) getReplicationControllers(res *public.Resource) ([]runtime.Object, error) {
var err error
var rcs []*apiv1.ReplicationController
if res.Namespace == "" {
rcs, err = s.replicationControllerLister.List(labels.Everything())
} else if res.Name == "" {
rcs, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).List(labels.Everything())
} else {
var rc *apiv1.ReplicationController
rc, err = s.replicationControllerLister.ReplicationControllers(res.Namespace).Get(res.Name)
rcs = []*apiv1.ReplicationController{rc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, rc := range rcs {
objects = append(objects, rc)
}
return objects, nil
}
func (s *server) getServices(res *public.Resource) ([]runtime.Object, error) {
var err error
var services []*apiv1.Service
if res.Namespace == "" {
services, err = s.serviceLister.List(labels.Everything())
} else if res.Name == "" {
services, err = s.serviceLister.Services(res.Namespace).List(labels.Everything())
} else {
var svc *apiv1.Service
svc, err = s.serviceLister.Services(res.Namespace).Get(res.Name)
services = []*apiv1.Service{svc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, svc := range services {
objects = append(objects, svc)
}
return objects, nil
}
// NewServer creates a new gRPC Tap server
func NewServer(
addr string,
tapPort uint,
replicaSets *k8s.ReplicaSetStore,
pods k8s.PodIndex,
namespaceLister corelisters.NamespaceLister,
deployLister applisters.DeploymentLister,
replicaSetLister applisters.ReplicaSetLister,
podLister corelisters.PodLister,
replicationControllerLister corelisters.ReplicationControllerLister,
serviceLister corelisters.ServiceLister,
lister *k8s.Lister,
) (*grpc.Server, net.Listener, error) {
lis, err := net.Listen("tcp", addr)
@ -740,15 +501,10 @@ func NewServer(
s := util.NewGrpcServer()
srv := server{
tapPort: tapPort,
replicaSets: replicaSets,
pods: pods,
namespaceLister: namespaceLister,
deployLister: deployLister,
replicaSetLister: replicaSetLister,
podLister: podLister,
replicationControllerLister: replicationControllerLister,
serviceLister: serviceLister,
tapPort: tapPort,
replicaSets: replicaSets,
pods: pods,
lister: lister,
}
pb.RegisterTapServer(s, &srv)

View File

@ -8,10 +8,8 @@ import (
public "github.com/runconduit/conduit/controller/gen/public"
"github.com/runconduit/conduit/controller/k8s"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/cache"
)
type tapExpected struct {
@ -179,24 +177,9 @@ status:
t.Fatalf("NewReplicaSetStore failed: %s", err)
}
sharedInformers := informers.NewSharedInformerFactory(clientSet, 10*time.Minute)
lister := k8s.NewLister(clientSet)
namespaceInformer := sharedInformers.Core().V1().Namespaces()
deployInformer := sharedInformers.Apps().V1beta2().Deployments()
replicaSetInformer := sharedInformers.Apps().V1beta2().ReplicaSets()
podInformer := sharedInformers.Core().V1().Pods()
replicationControllerInformer := sharedInformers.Core().V1().ReplicationControllers()
serviceInformer := sharedInformers.Core().V1().Services()
server, listener, err := NewServer(
"localhost:0", 0, replicaSets, k8s.NewEmptyPodIndex(),
namespaceInformer.Lister(),
deployInformer.Lister(),
replicaSetInformer.Lister(),
podInformer.Lister(),
replicationControllerInformer.Lister(),
serviceInformer.Lister(),
)
server, listener, err := NewServer("localhost:0", 0, replicaSets, k8s.NewEmptyPodIndex(), lister)
if err != nil {
t.Fatalf("NewServer error: %s", err)
}
@ -204,18 +187,9 @@ status:
go func() { server.Serve(listener) }()
defer server.GracefulStop()
stopCh := make(chan struct{})
sharedInformers.Start(stopCh)
if !cache.WaitForCacheSync(
stopCh,
namespaceInformer.Informer().HasSynced,
deployInformer.Informer().HasSynced,
replicaSetInformer.Informer().HasSynced,
podInformer.Informer().HasSynced,
replicationControllerInformer.Informer().HasSynced,
serviceInformer.Informer().HasSynced,
) {
t.Fatalf("timed out wait for caches to sync")
err = lister.Sync()
if err != nil {
t.Fatalf("timed out wait for caches to sync: %s", err)
}
client, conn, err := NewClient(listener.Addr().String())

View File

@ -9,13 +9,23 @@ import (
)
const (
KubernetesDeployments = "deployments"
KubernetesNamespaces = "namespaces"
KubernetesPods = "pods"
KubernetesReplicationControllers = "replicationcontrollers"
KubernetesServices = "services"
Deployments = "deployments"
Namespaces = "namespaces"
Pods = "pods"
ReplicationControllers = "replicationcontrollers"
Services = "services"
)
// ResourceTypesToProxyLabels maps Kubernetes resource type names to keys
// understood by the proxy, specifically Destination and Prometheus labels.
var ResourceTypesToProxyLabels = map[string]string{
Deployments: "deployment",
Namespaces: "namespace",
Pods: "pod",
ReplicationControllers: "replication_controller",
Services: "service",
}
func generateKubernetesApiBaseUrlFor(schemeHostAndPort string, namespace string, extraPathStartingWithSlash string) (*url.URL, error) {
if string(extraPathStartingWithSlash[0]) != "/" {
return nil, fmt.Errorf("Path must start with a [/], was [%s]", extraPathStartingWithSlash)
@ -60,15 +70,15 @@ func getConfig(fpath string) (*rest.Config, error) {
func CanonicalKubernetesNameFromFriendlyName(friendlyName string) (string, error) {
switch friendlyName {
case "deploy", "deployment", "deployments":
return KubernetesDeployments, nil
return Deployments, nil
case "ns", "namespace", "namespaces":
return KubernetesNamespaces, nil
return Namespaces, nil
case "po", "pod", "pods":
return KubernetesPods, nil
return Pods, nil
case "rc", "replicationcontroller", "replicationcontrollers":
return KubernetesReplicationControllers, nil
return ReplicationControllers, nil
case "svc", "service", "services":
return KubernetesServices, nil
return Services, nil
}
return "", fmt.Errorf("cannot find Kubernetes canonical name from friendly name [%s]", friendlyName)

View File

@ -82,10 +82,10 @@ func TestGetConfig(t *testing.T) {
func TestCanonicalKubernetesNameFromFriendlyName(t *testing.T) {
t.Run("Returns canonical name for all known variants", func(t *testing.T) {
expectations := map[string]string{
"po": KubernetesPods,
"pod": KubernetesPods,
"deployment": KubernetesDeployments,
"deployments": KubernetesDeployments,
"po": Pods,
"pod": Pods,
"deployment": Deployments,
"deployments": Deployments,
}
for input, expectedName := range expectations {

View File

@ -1,5 +1,5 @@
## compile proxy-init utility
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
FROM gcr.io/runconduit/go-deps:ff0dc047 as golang
WORKDIR /go/src/github.com/runconduit/conduit
COPY ./proxy-init ./proxy-init
RUN CGO_ENABLED=0 GOOS=linux go install -v -installsuffix cgo ./proxy-init/

View File

@ -12,7 +12,7 @@ RUN $HOME/.yarn/bin/yarn install --pure-lockfile
RUN $HOME/.yarn/bin/yarn webpack
## compile go server
FROM gcr.io/runconduit/go-deps:a6e8221d as golang
FROM gcr.io/runconduit/go-deps:ff0dc047 as golang
ARG CONDUIT_VERSION
WORKDIR /go/src/github.com/runconduit/conduit
COPY web web