From 1fe19bf3ced627e05f03bec849434aac6d653732 Mon Sep 17 00:00:00 2001 From: Alex Leong Date: Fri, 12 Oct 2018 09:35:11 -0700 Subject: [PATCH] Add ServiceProfile support to k8s utilities (#1758) Updates to the Kubernetes utility code in `/controller/k8s` to support interacting with ServiceProfiles. This makes use of the code generated client added in #1752 Signed-off-by: Alex Leong --- controller/api/public/grpc_server_test.go | 2 +- controller/api/public/stat_summary_test.go | 6 +-- controller/ca/controller_test.go | 2 +- controller/cmd/ca/main.go | 4 +- controller/cmd/destination/main.go | 7 +++ controller/cmd/public-api/main.go | 1 + controller/cmd/tap/main.go | 5 +- .../destination/endpoints_watcher_test.go | 2 +- controller/destination/server_test.go | 4 +- controller/k8s/api.go | 52 ++++++++++++++----- controller/k8s/api_test.go | 32 ++++++------ controller/k8s/clientset.go | 22 +++++++- controller/k8s/test_helper.go | 20 +++++-- controller/tap/server_test.go | 2 +- pkg/k8s/k8s.go | 1 + 15 files changed, 116 insertions(+), 46 deletions(-) diff --git a/controller/api/public/grpc_server_test.go b/controller/api/public/grpc_server_test.go index 013bed42e..1d87bd309 100644 --- a/controller/api/public/grpc_server_test.go +++ b/controller/api/public/grpc_server_test.go @@ -148,7 +148,7 @@ spec: } for _, exp := range expectations { - k8sAPI, err := k8s.NewFakeAPI(exp.k8sRes...) + k8sAPI, err := k8s.NewFakeAPI("", exp.k8sRes...) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } diff --git a/controller/api/public/stat_summary_test.go b/controller/api/public/stat_summary_test.go index 7563171b9..904520a6e 100644 --- a/controller/api/public/stat_summary_test.go +++ b/controller/api/public/stat_summary_test.go @@ -69,7 +69,7 @@ func genEmptyResponse() pb.StatSummaryResponse { func testStatSummary(t *testing.T, expectations []statSumExpected) { for _, exp := range expectations { - k8sAPI, err := k8s.NewFakeAPI(exp.k8sConfigs...) + k8sAPI, err := k8s.NewFakeAPI("", exp.k8sConfigs...) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } @@ -728,7 +728,7 @@ status: }) t.Run("Given an invalid resource type, returns error", func(t *testing.T) { - k8sAPI, err := k8s.NewFakeAPI() + k8sAPI, err := k8s.NewFakeAPI("") if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } @@ -787,7 +787,7 @@ status: }) t.Run("Validates service stat requests", func(t *testing.T) { - k8sAPI, err := k8s.NewFakeAPI() + k8sAPI, err := k8s.NewFakeAPI("") if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } diff --git a/controller/ca/controller_test.go b/controller/ca/controller_test.go index a5db29a1a..856ca059b 100644 --- a/controller/ca/controller_test.go +++ b/controller/ca/controller_test.go @@ -62,7 +62,7 @@ func TestCertificateController(t *testing.T) { } func new(fixtures ...string) (*CertificateController, chan bool, chan struct{}, error) { - k8sAPI, err := k8s.NewFakeAPI(fixtures...) + k8sAPI, err := k8s.NewFakeAPI("", fixtures...) if err != nil { return nil, nil, nil, fmt.Errorf("NewFakeAPI returned an error: %s", err) } diff --git a/controller/cmd/ca/main.go b/controller/cmd/ca/main.go index 18e68966a..d91ad6042 100644 --- a/controller/cmd/ca/main.go +++ b/controller/cmd/ca/main.go @@ -36,9 +36,9 @@ func main() { var k8sAPI *k8s.API if *proxyAutoInject { - k8sAPI = k8s.NewAPI(k8sClient, restrictToNamespace, k8s.Pod, k8s.RS, k8s.MWC) + k8sAPI = k8s.NewAPI(k8sClient, nil, restrictToNamespace, k8s.Pod, k8s.RS, k8s.MWC) } else { - k8sAPI = k8s.NewAPI(k8sClient, restrictToNamespace, k8s.Pod, k8s.RS) + k8sAPI = k8s.NewAPI(k8sClient, nil, restrictToNamespace, k8s.Pod, k8s.RS) } controller, err := ca.NewCertificateController(*controllerNamespace, k8sAPI, *proxyAutoInject) diff --git a/controller/cmd/destination/main.go b/controller/cmd/destination/main.go index 4bb2aa05f..8ea99a5cc 100644 --- a/controller/cmd/destination/main.go +++ b/controller/cmd/destination/main.go @@ -30,17 +30,24 @@ func main() { if err != nil { log.Fatal(err.Error()) } + spClient, err := k8s.NewSpClientSet(*kubeConfigPath) + if err != nil { + log.Fatal(err.Error()) + } + restrictToNamespace := "" if *singleNamespace { restrictToNamespace = *controllerNamespace } k8sAPI := k8s.NewAPI( k8sClient, + spClient, restrictToNamespace, k8s.Endpoint, k8s.Pod, k8s.RS, k8s.Svc, + k8s.SP, ) done := make(chan struct{}) diff --git a/controller/cmd/public-api/main.go b/controller/cmd/public-api/main.go index 092feec97..b24623196 100644 --- a/controller/cmd/public-api/main.go +++ b/controller/cmd/public-api/main.go @@ -47,6 +47,7 @@ func main() { } k8sAPI := k8s.NewAPI( k8sClient, + nil, restrictToNamespace, k8s.Deploy, k8s.Pod, diff --git a/controller/cmd/tap/main.go b/controller/cmd/tap/main.go index 64690880c..68a6fb556 100644 --- a/controller/cmd/tap/main.go +++ b/controller/cmd/tap/main.go @@ -25,7 +25,7 @@ func main() { stop := make(chan os.Signal, 1) signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - clientSet, err := k8s.NewClientSet(*kubeConfigPath) + k8sClient, err := k8s.NewClientSet(*kubeConfigPath) if err != nil { log.Fatalf("failed to create Kubernetes client: %s", err) } @@ -34,7 +34,8 @@ func main() { restrictToNamespace = *controllerNamespace } k8sAPI := k8s.NewAPI( - clientSet, + k8sClient, + nil, restrictToNamespace, k8s.Deploy, k8s.Pod, diff --git a/controller/destination/endpoints_watcher_test.go b/controller/destination/endpoints_watcher_test.go index fe33180aa..b33866bd5 100644 --- a/controller/destination/endpoints_watcher_test.go +++ b/controller/destination/endpoints_watcher_test.go @@ -260,7 +260,7 @@ spec: }, } { t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) { - k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...) + k8sAPI, err := k8s.NewFakeAPI("", tt.k8sConfigs...) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } diff --git a/controller/destination/server_test.go b/controller/destination/server_test.go index c4e7aa27e..d74a8234c 100644 --- a/controller/destination/server_test.go +++ b/controller/destination/server_test.go @@ -29,7 +29,7 @@ func (m *mockDestination_GetServer) SendMsg(x interface{}) error { return m.err func (m *mockDestination_GetServer) RecvMsg(x interface{}) error { return m.errorToReturn } func TestBuildResolversList(t *testing.T) { - k8sAPI, err := k8s.NewFakeAPI() + k8sAPI, err := k8s.NewFakeAPI("") if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } @@ -89,7 +89,7 @@ func TestStreamResolutionUsingCorrectResolverFor(t *testing.T) { stream := &mockDestination_GetServer{} host := "something" port := 666 - k8sAPI, err := k8s.NewFakeAPI() + k8sAPI, err := k8s.NewFakeAPI("") if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } diff --git a/controller/k8s/api.go b/controller/k8s/api.go index 99cd3644a..a122b5b14 100644 --- a/controller/k8s/api.go +++ b/controller/k8s/api.go @@ -6,6 +6,9 @@ import ( "strings" "time" + spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned" + sp "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions" + spinformers "github.com/linkerd/linkerd2/controller/gen/client/informers/externalversions/serviceprofile/v1alpha1" "github.com/linkerd/linkerd2/pkg/k8s" log "github.com/sirupsen/logrus" "google.golang.org/grpc/codes" @@ -29,11 +32,12 @@ const ( CM ApiResource = iota Deploy Endpoint + MWC // mutating webhook configuration Pod RC RS + SP Svc - MWC // mutating webhook configuration ) // API provides shared informers for all Kubernetes objects @@ -43,22 +47,26 @@ type API struct { cm coreinformers.ConfigMapInformer deploy appinformers.DeploymentInformer endpoint coreinformers.EndpointsInformer + mwc arinformers.MutatingWebhookConfigurationInformer pod coreinformers.PodInformer rc coreinformers.ReplicationControllerInformer rs appinformers.ReplicaSetInformer + sp spinformers.ServiceProfileInformer svc coreinformers.ServiceInformer - mwc arinformers.MutatingWebhookConfigurationInformer - syncChecks []cache.InformerSynced - sharedInformers informers.SharedInformerFactory - namespace string + syncChecks []cache.InformerSynced + sharedInformers informers.SharedInformerFactory + spSharedInformers sp.SharedInformerFactory + namespace string } // NewAPI takes a Kubernetes client and returns an initialized API -func NewAPI(k8sClient kubernetes.Interface, namespace string, resources ...ApiResource) *API { +func NewAPI(k8sClient kubernetes.Interface, spClient spclient.Interface, namespace string, resources ...ApiResource) *API { var sharedInformers informers.SharedInformerFactory + var spSharedInformers sp.SharedInformerFactory if namespace == "" { sharedInformers = informers.NewSharedInformerFactory(k8sClient, 10*time.Minute) + spSharedInformers = sp.NewSharedInformerFactory(spClient, 10*time.Minute) } else { sharedInformers = informers.NewFilteredSharedInformerFactory( k8sClient, @@ -66,13 +74,20 @@ func NewAPI(k8sClient kubernetes.Interface, namespace string, resources ...ApiRe namespace, nil, ) + spSharedInformers = sp.NewFilteredSharedInformerFactory( + spClient, + 10*time.Minute, + namespace, + nil, + ) } api := &API{ - Client: k8sClient, - syncChecks: make([]cache.InformerSynced, 0), - sharedInformers: sharedInformers, - namespace: namespace, + Client: k8sClient, + syncChecks: make([]cache.InformerSynced, 0), + sharedInformers: sharedInformers, + spSharedInformers: spSharedInformers, + namespace: namespace, } for _, resource := range resources { @@ -86,6 +101,9 @@ func NewAPI(k8sClient kubernetes.Interface, namespace string, resources ...ApiRe case Endpoint: api.endpoint = sharedInformers.Core().V1().Endpoints() api.syncChecks = append(api.syncChecks, api.endpoint.Informer().HasSynced) + case MWC: + api.mwc = sharedInformers.Admissionregistration().V1beta1().MutatingWebhookConfigurations() + api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced) case Pod: api.pod = sharedInformers.Core().V1().Pods() api.syncChecks = append(api.syncChecks, api.pod.Informer().HasSynced) @@ -95,12 +113,12 @@ func NewAPI(k8sClient kubernetes.Interface, namespace string, resources ...ApiRe case RS: api.rs = sharedInformers.Apps().V1beta2().ReplicaSets() api.syncChecks = append(api.syncChecks, api.rs.Informer().HasSynced) + case SP: + api.sp = spSharedInformers.Linkerd().V1alpha1().ServiceProfiles() + api.syncChecks = append(api.syncChecks, api.sp.Informer().HasSynced) case Svc: api.svc = sharedInformers.Core().V1().Services() api.syncChecks = append(api.syncChecks, api.svc.Informer().HasSynced) - case MWC: - api.mwc = sharedInformers.Admissionregistration().V1beta1().MutatingWebhookConfigurations() - api.syncChecks = append(api.syncChecks, api.mwc.Informer().HasSynced) } } @@ -112,6 +130,7 @@ func NewAPI(k8sClient kubernetes.Interface, namespace string, resources ...ApiRe // For testing, call this synchronously. func (api *API) Sync(readyCh chan<- struct{}) { api.sharedInformers.Start(nil) + api.spSharedInformers.Start(nil) ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() @@ -176,6 +195,13 @@ func (api *API) CM() coreinformers.ConfigMapInformer { return api.cm } +func (api *API) SP() spinformers.ServiceProfileInformer { + if api.sp == nil { + panic("SP informer not configured") + } + return api.sp +} + func (api *API) MWC() arinformers.MutatingWebhookConfigurationInformer { if api.mwc == nil { panic("MWC informer not configured") diff --git a/controller/k8s/api_test.go b/controller/k8s/api_test.go index 9b7c89952..827b67371 100644 --- a/controller/k8s/api_test.go +++ b/controller/k8s/api_test.go @@ -10,9 +10,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/status" apiv1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/client-go/kubernetes/fake" ) func newAPI(resourceConfigs []string, extraConfigs ...string) (*API, []runtime.Object, error) { @@ -32,7 +30,7 @@ func newAPI(resourceConfigs []string, extraConfigs ...string) (*API, []runtime.O k8sConfigs = append(k8sConfigs, config) } - api, err := NewFakeAPI(k8sConfigs...) + api, err := NewFakeAPI("", k8sConfigs...) if err != nil { return nil, nil, fmt.Errorf("NewFakeAPI returned an error: %s", err) } @@ -172,19 +170,23 @@ metadata: t.Run("In single-namespace mode", func(t *testing.T) { t.Run("Returns only the configured namespace", func(t *testing.T) { - ns1 := &apiv1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "namespace1", - }, - } - ns2 := &apiv1.Namespace{ - ObjectMeta: metav1.ObjectMeta{ - Name: "namespace2", - }, - } - clientSet := fake.NewSimpleClientset(ns1, ns2) - api := NewAPI(clientSet, "namespace1") + ns1 := ` +apiVersion: v1 +kind: Namespace +metadata: + name: namespace1` + + ns2 := ` +apiVersion: v1 +kind: Namespace +metadata: + name: namespace2` + + api, err := NewFakeAPI("namespace1", ns1, ns2) + if err != nil { + t.Fatalf("NewFakeAPI returned an error: %s", err) + } namespaces, err := api.GetObjects("", k8s.Namespace, "") if err != nil { diff --git a/controller/k8s/clientset.go b/controller/k8s/clientset.go index feec1bb3e..0d8d72b19 100644 --- a/controller/k8s/clientset.go +++ b/controller/k8s/clientset.go @@ -1,6 +1,7 @@ package k8s import ( + spclient "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -9,6 +10,24 @@ import ( ) func NewClientSet(kubeConfig string) (*kubernetes.Clientset, error) { + config, err := parseConfig(kubeConfig) + if err != nil { + return nil, err + } + + return kubernetes.NewForConfig(config) +} + +func NewSpClientSet(kubeConfig string) (*spclient.Clientset, error) { + config, err := parseConfig(kubeConfig) + if err != nil { + return nil, err + } + + return spclient.NewForConfig(config) +} + +func parseConfig(kubeConfig string) (*rest.Config, error) { var config *rest.Config var err error @@ -23,6 +42,5 @@ func NewClientSet(kubeConfig string) (*kubernetes.Clientset, error) { if err != nil { return nil, err } - - return kubernetes.NewForConfig(config) + return config, nil } diff --git a/controller/k8s/test_helper.go b/controller/k8s/test_helper.go index ad5e753c7..ce1d76e8c 100644 --- a/controller/k8s/test_helper.go +++ b/controller/k8s/test_helper.go @@ -1,31 +1,44 @@ package k8s import ( + "strings" + + spfake "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/fake" + spscheme "github.com/linkerd/linkerd2/controller/gen/client/clientset/versioned/scheme" + "github.com/linkerd/linkerd2/pkg/k8s" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" ) func toRuntimeObject(config string) (runtime.Object, error) { + spscheme.AddToScheme(scheme.Scheme) decode := scheme.Codecs.UniversalDeserializer().Decode obj, _, err := decode([]byte(config), nil, nil) return obj, err } -func NewFakeAPI(configs ...string) (*API, error) { +func NewFakeAPI(namespace string, configs ...string) (*API, error) { objs := []runtime.Object{} + spObjs := []runtime.Object{} for _, config := range configs { obj, err := toRuntimeObject(config) if err != nil { return nil, err } - objs = append(objs, obj) + if strings.ToLower(obj.GetObjectKind().GroupVersionKind().Kind) == k8s.ServiceProfile { + spObjs = append(spObjs, obj) + } else { + objs = append(objs, obj) + } } clientSet := fake.NewSimpleClientset(objs...) + spClientSet := spfake.NewSimpleClientset(spObjs...) return NewAPI( clientSet, - "", + spClientSet, + namespace, CM, Deploy, Endpoint, @@ -33,6 +46,7 @@ func NewFakeAPI(configs ...string) (*API, error) { RC, RS, Svc, + SP, MWC, ), nil } diff --git a/controller/tap/server_test.go b/controller/tap/server_test.go index 651a6a615..b241ca0a5 100644 --- a/controller/tap/server_test.go +++ b/controller/tap/server_test.go @@ -184,7 +184,7 @@ status: } for _, exp := range expectations { - k8sAPI, err := k8s.NewFakeAPI(exp.k8sRes...) + k8sAPI, err := k8s.NewFakeAPI("", exp.k8sRes...) if err != nil { t.Fatalf("NewFakeAPI returned an error: %s", err) } diff --git a/pkg/k8s/k8s.go b/pkg/k8s/k8s.go index 6f16a2a0a..f2001aa1a 100644 --- a/pkg/k8s/k8s.go +++ b/pkg/k8s/k8s.go @@ -18,6 +18,7 @@ const ( ReplicationController = "replicationcontroller" ReplicaSet = "replicaset" Service = "service" + ServiceProfile = "serviceprofile" StatefulSet = "statefulset" )