linkerd2/controller/k8s/api.go

1209 lines
37 KiB
Go

package k8s
import (
"context"
"fmt"
"strings"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
spv1alpha2 "github.com/linkerd/linkerd2/controller/gen/apis/serviceprofile/v1alpha2"
l5dcrdclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned"
l5dcrdinformer "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions"
ewinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/externalworkload/v1beta1"
linkinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/link/v1alpha3"
srvinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/server/v1beta3"
spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha2"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
appsv1 "k8s.io/api/apps/v1"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
arinformers "k8s.io/client-go/informers/admissionregistration/v1"
appv1informers "k8s.io/client-go/informers/apps/v1"
batchv1informers "k8s.io/client-go/informers/batch/v1"
coreinformers "k8s.io/client-go/informers/core/v1"
discoveryinformers "k8s.io/client-go/informers/discovery/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
// API provides shared informers for all Kubernetes objects
type API struct {
promGauges
Client kubernetes.Interface
L5dClient l5dcrdclient.Interface
DynamicClient dynamic.Interface
cj batchv1informers.CronJobInformer
cm coreinformers.ConfigMapInformer
deploy appv1informers.DeploymentInformer
ds appv1informers.DaemonSetInformer
endpoint coreinformers.EndpointsInformer
es discoveryinformers.EndpointSliceInformer
ew ewinformers.ExternalWorkloadInformer
job batchv1informers.JobInformer
link linkinformers.LinkInformer
mwc arinformers.MutatingWebhookConfigurationInformer
ns coreinformers.NamespaceInformer
pod coreinformers.PodInformer
rc coreinformers.ReplicationControllerInformer
rs appv1informers.ReplicaSetInformer
sp spinformers.ServiceProfileInformer
ss appv1informers.StatefulSetInformer
svc coreinformers.ServiceInformer
node coreinformers.NodeInformer
secret coreinformers.SecretInformer
srv srvinformers.ServerInformer
syncChecks []cache.InformerSynced
sharedInformers informers.SharedInformerFactory
l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory
}
// InitializeAPI creates Kubernetes clients and returns an initialized API
// wrapper. This creates informers on each one of resources passed, registering
// metrics on each one; don't forget to call UnregisterGauges() on the returned
// API reference to clean them up!
func InitializeAPI(ctx context.Context, kubeConfig string, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
config, err := k8s.GetConfig(kubeConfig, "")
if err != nil {
return nil, fmt.Errorf("error configuring Kubernetes API client: %w", err)
}
dynamicClient, err := dynamic.NewForConfig(config)
if err != nil {
return nil, err
}
k8sClient, err := k8s.NewAPIForConfig(config, "", []string{}, 0, 0, 0)
if err != nil {
return nil, err
}
return initAPI(ctx, k8sClient, dynamicClient, config, ensureClusterWideAccess, cluster, resources...)
}
// InitializeAPIForConfig creates Kubernetes clients and returns an initialized
// API wrapper. This creates informers on each one of resources passed,
// registering metrics on each one; don't forget to call UnregisterGauges() on
// the returned API reference to clean them up!
func InitializeAPIForConfig(ctx context.Context, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
k8sClient, err := k8s.NewAPIForConfig(kubeConfig, "", []string{}, 0, 0, 0)
if err != nil {
return nil, err
}
return initAPI(ctx, k8sClient, nil, kubeConfig, ensureClusterWideAccess, cluster, resources...)
}
func initAPI(ctx context.Context, k8sClient *k8s.KubernetesAPI, dynamicClient dynamic.Interface, kubeConfig *rest.Config, ensureClusterWideAccess bool, cluster string, resources ...APIResource) (*API, error) {
// check for cluster-wide access
var err error
if ensureClusterWideAccess {
err := k8s.ClusterAccess(ctx, k8sClient)
if err != nil {
return nil, err
}
}
// check for need and access to Linkerd CRD clients
var l5dCrdClient *l5dcrdclient.Clientset
for _, res := range resources {
switch {
case res == SP:
err := k8s.ServiceProfilesAccess(ctx, k8sClient)
if err != nil {
return nil, err
}
case res == Srv:
err := k8s.ServersAccess(ctx, k8sClient)
if err != nil {
return nil, err
}
case res == ExtWorkload:
err := k8s.ExtWorkloadAccess(ctx, k8sClient)
if err != nil {
return nil, err
}
default:
continue
}
l5dCrdClient, err = NewL5DCRDClient(kubeConfig)
if err != nil {
return nil, err
}
break
}
api := NewClusterScopedAPI(k8sClient, dynamicClient, l5dCrdClient, cluster, resources...)
for _, gauge := range api.gauges {
if err := prometheus.Register(gauge); err != nil {
log.Warnf("failed to register Prometheus gauge %s: %s", gauge.Desc().String(), err)
}
}
return api, nil
}
// NewClusterScopedAPI takes a Kubernetes client and returns an initialized
// cluster-wide API. This creates informers on each one of resources passed,
// registering metrics on each one; don't forget to call UnregisterGauges() on
// the returned API reference to clean them up!
func NewClusterScopedAPI(
k8sClient kubernetes.Interface,
dynamicClient dynamic.Interface,
l5dCrdClient l5dcrdclient.Interface,
cluster string,
resources ...APIResource,
) *API {
sharedInformers := informers.NewSharedInformerFactory(k8sClient, ResyncTime)
return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...)
}
// NewNamespacedAPI takes a Kubernetes client and returns an initialized API
// scoped to namespace. This creates informers on each one of resources passed,
// registering metrics on each one; don't forget to call UnregisterGauges() on
// the returned API reference to clean them up!
func NewNamespacedAPI(
k8sClient kubernetes.Interface,
dynamicClient dynamic.Interface,
l5dCrdClient l5dcrdclient.Interface,
namespace string,
cluster string,
resources ...APIResource,
) *API {
sharedInformers := informers.NewSharedInformerFactoryWithOptions(k8sClient, ResyncTime, informers.WithNamespace(namespace))
return newAPI(k8sClient, dynamicClient, l5dCrdClient, sharedInformers, cluster, resources...)
}
func NewL5dNamespacedAPI(
l5dCrdClient l5dcrdclient.Interface,
namespace string,
cluster string,
resources ...APIResource,
) *API {
l5dCrdSharedInformers := l5dcrdinformer.NewSharedInformerFactoryWithOptions(l5dCrdClient, ResyncTime, l5dcrdinformer.WithNamespace(namespace))
api := &API{
L5dClient: l5dCrdClient,
syncChecks: make([]cache.InformerSynced, 0),
l5dCrdSharedInformers: l5dCrdSharedInformers,
}
informerLabels := prometheus.Labels{
"cluster": cluster,
}
for _, resource := range resources {
switch resource {
case ExtWorkload:
api.ew = l5dCrdSharedInformers.Externalworkload().V1beta1().ExternalWorkloads()
api.syncChecks = append(api.syncChecks, api.ew.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ExtWorkload, informerLabels, api.ew.Informer())
case Link:
api.link = l5dCrdSharedInformers.Link().V1alpha3().Links()
api.syncChecks = append(api.syncChecks, api.link.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Link, informerLabels, api.link.Informer())
case SP:
api.sp = l5dCrdSharedInformers.Linkerd().V1alpha2().ServiceProfiles()
api.syncChecks = append(api.syncChecks, api.sp.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ServiceProfile, informerLabels, api.sp.Informer())
case Srv:
api.srv = l5dCrdSharedInformers.Server().V1beta3().Servers()
api.syncChecks = append(api.syncChecks, api.srv.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Server, informerLabels, api.srv.Informer())
}
}
return api
}
// newAPI takes a Kubernetes client and returns an initialized API.
func newAPI(
k8sClient kubernetes.Interface,
dynamicClient dynamic.Interface,
l5dCrdClient l5dcrdclient.Interface,
sharedInformers informers.SharedInformerFactory,
cluster string,
resources ...APIResource,
) *API {
var l5dCrdSharedInformers l5dcrdinformer.SharedInformerFactory
if l5dCrdClient != nil {
l5dCrdSharedInformers = l5dcrdinformer.NewSharedInformerFactory(l5dCrdClient, ResyncTime)
}
api := &API{
Client: k8sClient,
L5dClient: l5dCrdClient,
DynamicClient: dynamicClient,
syncChecks: make([]cache.InformerSynced, 0),
sharedInformers: sharedInformers,
l5dCrdSharedInformers: l5dCrdSharedInformers,
}
informerLabels := prometheus.Labels{
"cluster": cluster,
}
for _, resource := range resources {
switch resource {
case CJ:
api.cj = sharedInformers.Batch().V1().CronJobs()
api.syncChecks = append(api.syncChecks, api.cj.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.CronJob, informerLabels, api.cj.Informer())
case CM:
api.cm = sharedInformers.Core().V1().ConfigMaps()
api.syncChecks = append(api.syncChecks, api.cm.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ConfigMap, informerLabels, api.cm.Informer())
case Deploy:
api.deploy = sharedInformers.Apps().V1().Deployments()
api.syncChecks = append(api.syncChecks, api.deploy.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Deployment, informerLabels, api.deploy.Informer())
case DS:
api.ds = sharedInformers.Apps().V1().DaemonSets()
api.syncChecks = append(api.syncChecks, api.ds.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.DaemonSet, informerLabels, api.ds.Informer())
case Endpoint:
api.endpoint = sharedInformers.Core().V1().Endpoints()
api.syncChecks = append(api.syncChecks, api.endpoint.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Endpoints, informerLabels, api.endpoint.Informer())
case ES:
api.es = sharedInformers.Discovery().V1().EndpointSlices()
api.syncChecks = append(api.syncChecks, api.es.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.EndpointSlices, informerLabels, api.es.Informer())
case ExtWorkload:
if l5dCrdSharedInformers == nil {
panic("Linkerd CRD shared informer not configured")
}
api.ew = l5dCrdSharedInformers.Externalworkload().V1beta1().ExternalWorkloads()
api.syncChecks = append(api.syncChecks, api.ew.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ExtWorkload, informerLabels, api.ew.Informer())
case Job:
api.job = sharedInformers.Batch().V1().Jobs()
api.syncChecks = append(api.syncChecks, api.job.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Job, informerLabels, api.job.Informer())
case Link:
if l5dCrdSharedInformers == nil {
panic("Linkerd CRD shared informer not configured")
}
api.link = l5dCrdSharedInformers.Link().V1alpha3().Links()
api.syncChecks = append(api.syncChecks, api.link.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Link, informerLabels, api.link.Informer())
case MWC:
api.mwc = sharedInformers.Admissionregistration().V1().MutatingWebhookConfigurations()
api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.MutatingWebhookConfig, informerLabels, api.mwc.Informer())
case NS:
api.ns = sharedInformers.Core().V1().Namespaces()
api.syncChecks = append(api.syncChecks, api.ns.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Namespace, informerLabels, api.ns.Informer())
case Pod:
api.pod = sharedInformers.Core().V1().Pods()
api.syncChecks = append(api.syncChecks, api.pod.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Pod, informerLabels, api.pod.Informer())
case RC:
api.rc = sharedInformers.Core().V1().ReplicationControllers()
api.syncChecks = append(api.syncChecks, api.rc.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ReplicationController, informerLabels, api.rc.Informer())
case RS:
api.rs = sharedInformers.Apps().V1().ReplicaSets()
api.syncChecks = append(api.syncChecks, api.rs.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ReplicaSet, informerLabels, api.rs.Informer())
case SP:
if l5dCrdSharedInformers == nil {
panic("Linkerd CRD shared informer not configured")
}
api.sp = l5dCrdSharedInformers.Linkerd().V1alpha2().ServiceProfiles()
api.syncChecks = append(api.syncChecks, api.sp.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.ServiceProfile, informerLabels, api.sp.Informer())
case Srv:
if l5dCrdSharedInformers == nil {
panic("Linkerd CRD shared informer not configured")
}
api.srv = l5dCrdSharedInformers.Server().V1beta3().Servers()
api.syncChecks = append(api.syncChecks, api.srv.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Server, informerLabels, api.srv.Informer())
case SS:
api.ss = sharedInformers.Apps().V1().StatefulSets()
api.syncChecks = append(api.syncChecks, api.ss.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.StatefulSet, informerLabels, api.ss.Informer())
case Svc:
api.svc = sharedInformers.Core().V1().Services()
api.syncChecks = append(api.syncChecks, api.svc.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Service, informerLabels, api.svc.Informer())
case Node:
api.node = sharedInformers.Core().V1().Nodes()
api.syncChecks = append(api.syncChecks, api.node.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Node, informerLabels, api.node.Informer())
case Secret:
api.secret = sharedInformers.Core().V1().Secrets()
api.syncChecks = append(api.syncChecks, api.secret.Informer().HasSynced)
api.promGauges.addInformerSize(k8s.Secret, informerLabels, api.secret.Informer())
}
}
return api
}
// Sync waits for all informers to be synced.
func (api *API) Sync(stopCh <-chan struct{}) {
if api.sharedInformers != nil {
api.sharedInformers.Start(stopCh)
}
if api.l5dCrdSharedInformers != nil {
api.l5dCrdSharedInformers.Start(stopCh)
}
waitForCacheSync(api.syncChecks)
}
// UnregisterGauges unregisters all the prometheus cache gauges associated to this API
func (api *API) UnregisterGauges() {
api.promGauges.unregister()
}
// NS provides access to a shared informer and lister for Namespaces.
func (api *API) NS() coreinformers.NamespaceInformer {
if api.ns == nil {
panic("NS informer not configured")
}
return api.ns
}
// Deploy provides access to a shared informer and lister for Deployments.
func (api *API) Deploy() appv1informers.DeploymentInformer {
if api.deploy == nil {
panic("Deploy informer not configured")
}
return api.deploy
}
// DS provides access to a shared informer and lister for Daemonsets.
func (api *API) DS() appv1informers.DaemonSetInformer {
if api.ds == nil {
panic("DS informer not configured")
}
return api.ds
}
// SS provides access to a shared informer and lister for Statefulsets.
func (api *API) SS() appv1informers.StatefulSetInformer {
if api.ss == nil {
panic("SS informer not configured")
}
return api.ss
}
// RS provides access to a shared informer and lister for ReplicaSets.
func (api *API) RS() appv1informers.ReplicaSetInformer {
if api.rs == nil {
panic("RS informer not configured")
}
return api.rs
}
// Pod provides access to a shared informer and lister for Pods.
func (api *API) Pod() coreinformers.PodInformer {
if api.pod == nil {
panic("Pod informer not configured")
}
return api.pod
}
// RC provides access to a shared informer and lister for
// ReplicationControllers.
func (api *API) RC() coreinformers.ReplicationControllerInformer {
if api.rc == nil {
panic("RC informer not configured")
}
return api.rc
}
// Svc provides access to a shared informer and lister for Services.
func (api *API) Svc() coreinformers.ServiceInformer {
if api.svc == nil {
panic("Svc informer not configured")
}
return api.svc
}
// Endpoint provides access to a shared informer and lister for Endpoints.
func (api *API) Endpoint() coreinformers.EndpointsInformer {
if api.endpoint == nil {
panic("Endpoint informer not configured")
}
return api.endpoint
}
// ES provides access to a shared informer and lister for EndpointSlices
func (api *API) ES() discoveryinformers.EndpointSliceInformer {
if api.es == nil {
panic("EndpointSlices informer not configured")
}
return api.es
}
// ExtWorkload() provides access to a shared informer and lister for
// ExternalWorkload CRDs
func (api *API) ExtWorkload() ewinformers.ExternalWorkloadInformer {
if api.ew == nil {
panic("ExternalWorkload informer not configured")
}
return api.ew
}
// CM provides access to a shared informer and lister for ConfigMaps.
func (api *API) CM() coreinformers.ConfigMapInformer {
if api.cm == nil {
panic("CM informer not configured")
}
return api.cm
}
// SP provides access to a shared informer and lister for ServiceProfiles.
func (api *API) SP() spinformers.ServiceProfileInformer {
if api.sp == nil {
panic("SP informer not configured")
}
return api.sp
}
// Srv provides access to a shared informer and lister for Servers.
func (api *API) Srv() srvinformers.ServerInformer {
if api.srv == nil {
panic("Srv informer not configured")
}
return api.srv
}
// MWC provides access to a shared informer and lister for MutatingWebhookConfigurations.
func (api *API) MWC() arinformers.MutatingWebhookConfigurationInformer {
if api.mwc == nil {
panic("MWC informer not configured")
}
return api.mwc
}
// Job provides access to a shared informer and lister for Jobs.
func (api *API) Job() batchv1informers.JobInformer {
if api.job == nil {
panic("Job informer not configured")
}
return api.job
}
func (api *API) Link() linkinformers.LinkInformer {
if api.link == nil {
panic("Link informer not configured")
}
return api.link
}
// SPAvailable informs the caller whether this API is configured to retrieve
// ServiceProfiles
func (api *API) SPAvailable() bool {
return api.sp != nil
}
// Node provides access to a shared informer and lister for Nodes.
func (api *API) Node() coreinformers.NodeInformer {
if api.node == nil {
panic("Node informer not configured")
}
return api.node
}
// Secret provides access to a shared informer and lister for Secrets.
func (api *API) Secret() coreinformers.SecretInformer {
if api.secret == nil {
panic("Secret informer not configured")
}
return api.secret
}
// CJ provides access to a shared informer and lister for CronJobs.
func (api *API) CJ() batchv1informers.CronJobInformer {
if api.cj == nil {
panic("CJ informer not configured")
}
return api.cj
}
// GetObjects returns a list of Kubernetes objects, given a namespace, type, name and label selector.
// If namespace is an empty string, match objects in all namespaces.
// If name is an empty string, match all objects of the given type.
// If label selector is an empty string, match all labels.
func (api *API) GetObjects(namespace, restype, name string, label labels.Selector) ([]runtime.Object, error) {
switch restype {
case k8s.Namespace:
return api.getNamespaces(name, label)
case k8s.CronJob:
return api.getCronjobs(namespace, name, label)
case k8s.DaemonSet:
return api.getDaemonsets(namespace, name, label)
case k8s.Deployment:
return api.getDeployments(namespace, name, label)
case k8s.Job:
return api.getJobs(namespace, name, label)
case k8s.Pod:
return api.getPods(namespace, name, label)
case k8s.ReplicationController:
return api.getRCs(namespace, name, label)
case k8s.ReplicaSet:
return api.getReplicasets(namespace, name, label)
case k8s.Service:
return api.getServices(namespace, name)
case k8s.StatefulSet:
return api.getStatefulsets(namespace, name, label)
default:
return nil, status.Errorf(codes.Unimplemented, "unimplemented resource type: %s", restype)
}
}
// KindSupported returns true if there is an informer configured for the
// specified resource type.
func (api *API) KindSupported(restype string) bool {
switch restype {
case k8s.Namespace:
return api.ns != nil
case k8s.CronJob:
return api.cj != nil
case k8s.DaemonSet:
return api.ds != nil
case k8s.Deployment:
return api.deploy != nil
case k8s.Job:
return api.job != nil
case k8s.Pod:
return api.pod != nil
case k8s.ReplicationController:
return api.rc != nil
case k8s.ReplicaSet:
return api.rs != nil
case k8s.Service:
return api.svc != nil
case k8s.StatefulSet:
return api.ss != nil
default:
return false
}
}
// GetOwnerKindAndName returns the pod owner's kind and name, using owner
// references from the Kubernetes API. The kind is represented as the Kubernetes
// singular resource type (e.g. deployment, daemonset, job, etc.).
// If retry is true, when the shared informer cache doesn't return anything
// we try again with a direct Kubernetes API call.
func (api *API) GetOwnerKindAndName(ctx context.Context, pod *corev1.Pod, retry bool) (string, string) {
ownerRefs := pod.GetOwnerReferences()
if len(ownerRefs) == 0 {
// pod without a parent
return k8s.Pod, pod.Name
} else if len(ownerRefs) > 1 {
log.Debugf("unexpected owner reference count (%d): %+v", len(ownerRefs), ownerRefs)
return k8s.Pod, pod.Name
}
parent := ownerRefs[0]
var parentObj metav1.Object
var err error
switch parent.Kind {
case "Job":
parentObj, err = api.Job().Lister().Jobs(pod.Namespace).Get(parent.Name)
if err != nil {
log.Warnf("failed to retrieve job from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
if retry {
parentObj, err = api.Client.BatchV1().Jobs(pod.Namespace).Get(ctx, parent.Name, metav1.GetOptions{})
if err != nil {
log.Warnf("failed to retrieve job from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
}
}
}
case "ReplicaSet":
rsObj, err := api.RS().Lister().ReplicaSets(pod.Namespace).Get(parent.Name)
if err != nil {
log.Warnf("failed to retrieve replicaset from indexer %s/%s: %s", pod.Namespace, parent.Name, err)
if retry {
rsObj, err = api.Client.AppsV1().ReplicaSets(pod.Namespace).Get(ctx, parent.Name, metav1.GetOptions{})
if err != nil {
log.Warnf("failed to retrieve replicaset from direct API call %s/%s: %s", pod.Namespace, parent.Name, err)
}
}
}
if rsObj == nil || !isValidRSParent(rsObj.GetObjectMeta()) {
return strings.ToLower(parent.Kind), parent.Name
}
parentObj = rsObj
default:
return strings.ToLower(parent.Kind), parent.Name
}
if err == nil && len(parentObj.GetOwnerReferences()) == 1 {
grandParent := parentObj.GetOwnerReferences()[0]
return strings.ToLower(grandParent.Kind), grandParent.Name
}
return strings.ToLower(parent.Kind), parent.Name
}
// GetPodsFor returns all running and pending Pods associated with a given
// Kubernetes object. Use includeFailed to also get failed Pods
func (api *API) GetPodsFor(obj runtime.Object, includeFailed bool) ([]*corev1.Pod, error) {
var namespace string
var selector labels.Selector
var ownerUID types.UID
var err error
pods := []*corev1.Pod{}
switch typed := obj.(type) {
case *corev1.Namespace:
namespace = typed.Name
selector = labels.Everything()
case *batchv1.CronJob:
namespace = typed.Namespace
selector = labels.Everything()
jobs, err := api.Job().Lister().Jobs(namespace).List(selector)
if err != nil {
return nil, err
}
for _, job := range jobs {
if isOwner(typed.UID, job.GetOwnerReferences()) {
jobPods, err := api.GetPodsFor(job, includeFailed)
if err != nil {
return nil, err
}
pods = append(pods, jobPods...)
}
}
return pods, nil
case *appsv1.DaemonSet:
namespace = typed.Namespace
selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
ownerUID = typed.UID
case *appsv1.Deployment:
namespace = typed.Namespace
selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
ret, err := api.RS().Lister().ReplicaSets(namespace).List(selector)
if err != nil {
return nil, err
}
for _, rs := range ret {
if isOwner(typed.UID, rs.GetOwnerReferences()) {
podsRS, err := api.GetPodsFor(rs, includeFailed)
if err != nil {
return nil, err
}
pods = append(pods, podsRS...)
}
}
return pods, nil
case *appsv1.ReplicaSet:
namespace = typed.Namespace
selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
ownerUID = typed.UID
case *batchv1.Job:
namespace = typed.Namespace
selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
ownerUID = typed.UID
case *corev1.ReplicationController:
namespace = typed.Namespace
selector = labels.Set(typed.Spec.Selector).AsSelector()
ownerUID = typed.UID
case *corev1.Service:
if typed.Spec.Type == corev1.ServiceTypeExternalName {
return []*corev1.Pod{}, nil
}
namespace = typed.Namespace
if typed.Spec.Selector == nil {
selector = labels.Nothing()
} else {
selector = labels.Set(typed.Spec.Selector).AsSelector()
}
case *appsv1.StatefulSet:
namespace = typed.Namespace
selector = labels.Set(typed.Spec.Selector.MatchLabels).AsSelector()
ownerUID = typed.UID
case *corev1.Pod:
// Special case for pods:
// GetPodsFor a pod should just return the pod itself
namespace = typed.Namespace
pod, err := api.Pod().Lister().Pods(typed.Namespace).Get(typed.Name)
if err != nil {
return nil, err
}
pods = []*corev1.Pod{pod}
default:
return nil, fmt.Errorf("Cannot get object selector: %v", obj)
}
// if obj.(type) is Pod, we've already retrieved it and put it in pods
// for the other types, pods will still be empty
if len(pods) == 0 {
pods, err = api.Pod().Lister().Pods(namespace).List(selector)
if err != nil {
return nil, err
}
}
allPods := []*corev1.Pod{}
for _, pod := range pods {
if isPendingOrRunning(pod) || (includeFailed && isFailed(pod)) {
if ownerUID == "" || isOwner(ownerUID, pod.GetOwnerReferences()) {
allPods = append(allPods, pod)
}
}
}
return allPods, nil
}
func isOwner(u types.UID, ownerRefs []metav1.OwnerReference) bool {
for _, or := range ownerRefs {
if u == or.UID {
return true
}
}
return false
}
// GetNameAndNamespaceOf returns the name and namespace of the given object.
func GetNameAndNamespaceOf(obj runtime.Object) (string, string, error) {
switch typed := obj.(type) {
case *corev1.Namespace:
return typed.Name, typed.Name, nil
case *batchv1.CronJob:
return typed.Name, typed.Namespace, nil
case *appsv1.DaemonSet:
return typed.Name, typed.Namespace, nil
case *appsv1.Deployment:
return typed.Name, typed.Namespace, nil
case *batchv1.Job:
return typed.Name, typed.Namespace, nil
case *appsv1.ReplicaSet:
return typed.Name, typed.Namespace, nil
case *corev1.ReplicationController:
return typed.Name, typed.Namespace, nil
case *corev1.Service:
return typed.Name, typed.Namespace, nil
case *appsv1.StatefulSet:
return typed.Name, typed.Namespace, nil
case *corev1.Pod:
return typed.Name, typed.Namespace, nil
default:
return "", "", fmt.Errorf("Cannot determine object type: %v", obj)
}
}
// GetNameOf returns the name of the given object.
func GetNameOf(obj runtime.Object) (string, error) {
name, _, err := GetNameAndNamespaceOf(obj)
if err != nil {
return "", err
}
return name, nil
}
// GetNamespaceOf returns the namespace of the given object.
func GetNamespaceOf(obj runtime.Object) (string, error) {
_, namespace, err := GetNameAndNamespaceOf(obj)
if err != nil {
return "", err
}
return namespace, nil
}
// getNamespaces returns the namespace matching the specified name. If no name
// is given, it returns all namespaces.
func (api *API) getNamespaces(name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var namespaces []*corev1.Namespace
if name == "" {
var err error
namespaces, err = api.NS().Lister().List(labelSelector)
if err != nil {
return nil, err
}
} else {
namespace, err := api.NS().Lister().Get(name)
if err != nil {
return nil, err
}
namespaces = []*corev1.Namespace{namespace}
}
objects := []runtime.Object{}
for _, ns := range namespaces {
objects = append(objects, ns)
}
return objects, nil
}
func (api *API) getDeployments(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var deploys []*appsv1.Deployment
if namespace == "" {
deploys, err = api.Deploy().Lister().List(labelSelector)
} else if name == "" {
deploys, err = api.Deploy().Lister().Deployments(namespace).List(labelSelector)
} else {
var deploy *appsv1.Deployment
deploy, err = api.Deploy().Lister().Deployments(namespace).Get(name)
deploys = []*appsv1.Deployment{deploy}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, deploy := range deploys {
objects = append(objects, deploy)
}
return objects, nil
}
func (api *API) getPods(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var pods []*corev1.Pod
if namespace == "" {
pods, err = api.Pod().Lister().List(labelSelector)
} else if name == "" {
pods, err = api.Pod().Lister().Pods(namespace).List(labelSelector)
} else {
var pod *corev1.Pod
pod, err = api.Pod().Lister().Pods(namespace).Get(name)
pods = []*corev1.Pod{pod}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, pod := range pods {
if !isPendingOrRunning(pod) {
continue
}
objects = append(objects, pod)
}
return objects, nil
}
func (api *API) getRCs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var rcs []*corev1.ReplicationController
if namespace == "" {
rcs, err = api.RC().Lister().List(labelSelector)
} else if name == "" {
rcs, err = api.RC().Lister().ReplicationControllers(namespace).List(labelSelector)
} else {
var rc *corev1.ReplicationController
rc, err = api.RC().Lister().ReplicationControllers(namespace).Get(name)
rcs = []*corev1.ReplicationController{rc}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, rc := range rcs {
objects = append(objects, rc)
}
return objects, nil
}
func (api *API) getDaemonsets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var daemonsets []*appsv1.DaemonSet
if namespace == "" {
daemonsets, err = api.DS().Lister().List(labelSelector)
} else if name == "" {
daemonsets, err = api.DS().Lister().DaemonSets(namespace).List(labelSelector)
} else {
var ds *appsv1.DaemonSet
ds, err = api.DS().Lister().DaemonSets(namespace).Get(name)
daemonsets = []*appsv1.DaemonSet{ds}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, ds := range daemonsets {
objects = append(objects, ds)
}
return objects, nil
}
func (api *API) getStatefulsets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var statefulsets []*appsv1.StatefulSet
if namespace == "" {
statefulsets, err = api.SS().Lister().List(labelSelector)
} else if name == "" {
statefulsets, err = api.SS().Lister().StatefulSets(namespace).List(labelSelector)
} else {
var ss *appsv1.StatefulSet
ss, err = api.SS().Lister().StatefulSets(namespace).Get(name)
statefulsets = []*appsv1.StatefulSet{ss}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, ss := range statefulsets {
objects = append(objects, ss)
}
return objects, nil
}
func (api *API) getJobs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var jobs []*batchv1.Job
if namespace == "" {
jobs, err = api.Job().Lister().List(labelSelector)
} else if name == "" {
jobs, err = api.Job().Lister().Jobs(namespace).List(labelSelector)
} else {
var job *batchv1.Job
job, err = api.Job().Lister().Jobs(namespace).Get(name)
jobs = []*batchv1.Job{job}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, job := range jobs {
objects = append(objects, job)
}
return objects, nil
}
func (api *API) getServices(namespace, name string) ([]runtime.Object, error) {
services, err := api.GetServices(namespace, name)
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, svc := range services {
objects = append(objects, svc)
}
return objects, nil
}
func (api *API) getCronjobs(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var cronjobs []*batchv1.CronJob
if namespace == "" {
cronjobs, err = api.CJ().Lister().List(labelSelector)
} else if name == "" {
cronjobs, err = api.CJ().Lister().CronJobs(namespace).List(labelSelector)
} else {
var cronjob *batchv1.CronJob
cronjob, err = api.CJ().Lister().CronJobs(namespace).Get(name)
cronjobs = []*batchv1.CronJob{cronjob}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, cronjob := range cronjobs {
objects = append(objects, cronjob)
}
return objects, nil
}
func (api *API) getReplicasets(namespace, name string, labelSelector labels.Selector) ([]runtime.Object, error) {
var err error
var replicasets []*appsv1.ReplicaSet
if namespace == "" {
replicasets, err = api.RS().Lister().List(labelSelector)
} else if name == "" {
replicasets, err = api.RS().Lister().ReplicaSets(namespace).List(labelSelector)
} else {
var replicaset *appsv1.ReplicaSet
replicaset, err = api.RS().Lister().ReplicaSets(namespace).Get(name)
replicasets = []*appsv1.ReplicaSet{replicaset}
}
if err != nil {
return nil, err
}
objects := []runtime.Object{}
for _, replicaset := range replicasets {
objects = append(objects, replicaset)
}
return objects, nil
}
// GetServices returns a list of Service resources, based on input namespace and
// name.
func (api *API) GetServices(namespace, name string) ([]*corev1.Service, error) {
var err error
var services []*corev1.Service
if namespace == "" {
services, err = api.Svc().Lister().List(labels.Everything())
} else if name == "" {
services, err = api.Svc().Lister().Services(namespace).List(labels.Everything())
} else {
var svc *corev1.Service
svc, err = api.Svc().Lister().Services(namespace).Get(name)
services = []*corev1.Service{svc}
}
return services, err
}
// GetServicesFor returns all Service resources which include a pod of the given
// resource object. In other words, it returns all Services of which the given
// resource object is a part of.
func (api *API) GetServicesFor(obj runtime.Object, includeFailed bool) ([]*corev1.Service, error) {
if svc, ok := obj.(*corev1.Service); ok {
return []*corev1.Service{svc}, nil
}
pods, err := api.GetPodsFor(obj, includeFailed)
if err != nil {
return nil, err
}
namespace, err := GetNamespaceOf(obj)
if err != nil {
return nil, err
}
allServices, err := api.GetServices(namespace, "")
if err != nil {
return nil, err
}
services := make([]*corev1.Service, 0)
for _, svc := range allServices {
svcPods, err := api.GetPodsFor(svc, includeFailed)
if err != nil {
return nil, err
}
if hasOverlap(pods, svcPods) {
services = append(services, svc)
}
}
return services, nil
}
// GetServiceProfileFor returns the service profile for a given service. We
// first look for a matching service profile in the client's namespace. If not
// found, we then look in the service's namespace. If no service profile is
// found, we return the default service profile.
func (api *API) GetServiceProfileFor(svc *corev1.Service, clientNs, clusterDomain string) *spv1alpha2.ServiceProfile {
dst := fmt.Sprintf("%s.%s.svc.%s", svc.Name, svc.Namespace, clusterDomain)
// First attempt to lookup profile in client namespace
if clientNs != "" {
p, err := api.SP().Lister().ServiceProfiles(clientNs).Get(dst)
if err == nil {
return p
}
if !apierrors.IsNotFound(err) {
log.Errorf("error getting service profile for %s in %s namespace: %s", dst, clientNs, err)
}
}
// Second, attempt to lookup profile in server namespace
if svc.Namespace != clientNs {
p, err := api.SP().Lister().ServiceProfiles(svc.Namespace).Get(dst)
if err == nil {
return p
}
if !apierrors.IsNotFound(err) {
log.Errorf("error getting service profile for %s in %s namespace: %s", dst, svc.Namespace, err)
}
}
// Not found; return default.
log.Debugf("no Service Profile found for '%s' -- using default", dst)
return &spv1alpha2.ServiceProfile{
ObjectMeta: metav1.ObjectMeta{
Name: dst,
},
Spec: spv1alpha2.ServiceProfileSpec{
Routes: []*spv1alpha2.RouteSpec{},
},
}
}
func hasOverlap(as, bs []*corev1.Pod) bool {
for _, a := range as {
for _, b := range bs {
if a.Name == b.Name {
return true
}
}
}
return false
}
func isPendingOrRunning(pod *corev1.Pod) bool {
pending := pod.Status.Phase == corev1.PodPending
running := pod.Status.Phase == corev1.PodRunning
terminating := pod.DeletionTimestamp != nil
return (pending || running) && !terminating
}
func isFailed(pod *corev1.Pod) bool {
return pod.Status.Phase == corev1.PodFailed
}