diff --git a/cmd/kops-controller/controllers/BUILD.bazel b/cmd/kops-controller/controllers/BUILD.bazel index 0dc238d03b..31e26d2df3 100644 --- a/cmd/kops-controller/controllers/BUILD.bazel +++ b/cmd/kops-controller/controllers/BUILD.bazel @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library") go_library( name = "go_default_library", - srcs = ["legacy_node_controller.go"], + srcs = [ + "legacy_node_controller.go", + "node_controller.go", + ], importpath = "k8s.io/kops/cmd/kops-controller/controllers", visibility = ["//visibility:public"], deps = [ diff --git a/cmd/kops-controller/controllers/legacy_node_controller.go b/cmd/kops-controller/controllers/legacy_node_controller.go index 917ffaa9ee..c48a1055ed 100644 --- a/cmd/kops-controller/controllers/legacy_node_controller.go +++ b/cmd/kops-controller/controllers/legacy_node_controller.go @@ -18,15 +18,12 @@ package controllers import ( "context" - "encoding/json" "fmt" "time" "github.com/go-logr/logr" corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/types" corev1client "k8s.io/client-go/kubernetes/typed/core/v1" "k8s.io/klog/v2" "k8s.io/kops/pkg/apis/kops" @@ -87,7 +84,7 @@ type LegacyNodeReconciler struct { } // +kubebuilder:rbac:groups=,resources=nodes,verbs=get;list;watch;patch -// Reconciler is the main reconciler function that observes node changes +// Reconcile is the main reconciler function that observes node changes. func (r *LegacyNodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() _ = r.log.WithValues("nodecontroller", req.NamespacedName) @@ -138,7 +135,7 @@ func (r *LegacyNodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) return ctrl.Result{}, nil } - if err := r.patchNodeLabels(ctx, node, updateLabels); err != nil { + if err := patchNodeLabels(r.coreV1Client, ctx, node, updateLabels); err != nil { klog.Warningf("failed to patch node labels on %s: %v", node.Name, err) return ctrl.Result{}, err } @@ -152,37 +149,6 @@ func (r *LegacyNodeReconciler) SetupWithManager(mgr ctrl.Manager) error { Complete(r) } -type nodePatch struct { - Metadata *nodePatchMetadata `json:"metadata,omitempty"` -} - -type nodePatchMetadata struct { - Labels map[string]string `json:"labels,omitempty"` -} - -// patchNodeLabels patches the node labels to set the specified labels -func (r *LegacyNodeReconciler) patchNodeLabels(ctx context.Context, node *corev1.Node, setLabels map[string]string) error { - nodePatchMetadata := &nodePatchMetadata{ - Labels: setLabels, - } - nodePatch := &nodePatch{ - Metadata: nodePatchMetadata, - } - nodePatchJson, err := json.Marshal(nodePatch) - if err != nil { - return fmt.Errorf("error building node patch: %v", err) - } - - klog.V(2).Infof("sending patch for node %q: %q", node.Name, string(nodePatchJson)) - - _, err = r.coreV1Client.Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, nodePatchJson, metav1.PatchOptions{}) - if err != nil { - return fmt.Errorf("error applying patch to node: %v", err) - } - - return nil -} - // getClusterForNode returns the kops.Cluster object for the node // The cluster is actually loaded when we first start func (r *LegacyNodeReconciler) getClusterForNode(node *corev1.Node) (*kops.Cluster, error) { diff --git a/cmd/kops-controller/controllers/node_controller.go b/cmd/kops-controller/controllers/node_controller.go new file mode 100644 index 0000000000..0a95e464ca --- /dev/null +++ b/cmd/kops-controller/controllers/node_controller.go @@ -0,0 +1,151 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package controllers + +import ( + "context" + "encoding/json" + "fmt" + + "github.com/go-logr/logr" + corev1 "k8s.io/api/core/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + corev1client "k8s.io/client-go/kubernetes/typed/core/v1" + "k8s.io/klog/v2" + "k8s.io/kops/pkg/nodeidentity" + ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/manager" +) + +// NewNodeReconciler is the constructor for a NodeReconciler +func NewNodeReconciler(mgr manager.Manager, identifier nodeidentity.Identifier) (*NodeReconciler, error) { + r := &NodeReconciler{ + client: mgr.GetClient(), + log: ctrl.Log.WithName("controllers").WithName("Node"), + identifier: identifier, + } + + coreClient, err := corev1client.NewForConfig(mgr.GetConfig()) + if err != nil { + return nil, fmt.Errorf("error building corev1 client: %v", err) + } + r.coreV1Client = coreClient + + return r, nil +} + +// NodeReconciler observes Node objects, and labels them with the correct labels for the instancegroup +// This used to be done by the kubelet, but is moving to a central controller for greater security in 1.16 +type NodeReconciler struct { + // client is the controller-runtime client + client client.Client + + // log is a logr + log logr.Logger + + // coreV1Client is a client-go client for patching nodes + coreV1Client *corev1client.CoreV1Client + + // identifier is a provider that can securely map node ProviderIDs to labels + identifier nodeidentity.Identifier +} + +// +kubebuilder:rbac:groups=,resources=nodes,verbs=get;list;watch;patch +// Reconcile is the main reconciler function that observes node changes. +func (r *NodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { + ctx := context.Background() + _ = r.log.WithValues("nodecontroller", req.NamespacedName) + + node := &corev1.Node{} + if err := r.client.Get(ctx, req.NamespacedName, node); err != nil { + klog.Warningf("unable to fetch node %s: %v", node.Name, err) + if apierrors.IsNotFound(err) { + // we'll ignore not-found errors, since they can't be fixed by an immediate + // requeue (we'll need to wait for a new notification), and we can get them + // on deleted requests. + return ctrl.Result{}, nil + } + return ctrl.Result{}, err + } + + info, err := r.identifier.IdentifyNode(ctx, node) + if err != nil { + return ctrl.Result{}, fmt.Errorf("error identifying node %q: %v", node.Name, err) + } + + labels := info.Labels + + updateLabels := make(map[string]string) + for k, v := range labels { + actual, found := node.Labels[k] + if !found || actual != v { + updateLabels[k] = v + } + } + + if len(updateLabels) == 0 { + klog.V(4).Infof("no label changes needed for %s", node.Name) + return ctrl.Result{}, nil + } + + if err := patchNodeLabels(r.coreV1Client, ctx, node, updateLabels); err != nil { + klog.Warningf("failed to patch node labels on %s: %v", node.Name, err) + return ctrl.Result{}, err + } + + return ctrl.Result{}, nil +} + +func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { + return ctrl.NewControllerManagedBy(mgr). + For(&corev1.Node{}). + Complete(r) +} + +type nodePatch struct { + Metadata *nodePatchMetadata `json:"metadata,omitempty"` +} + +type nodePatchMetadata struct { + Labels map[string]string `json:"labels,omitempty"` +} + +// patchNodeLabels patches the node labels to set the specified labels +func patchNodeLabels(client *corev1client.CoreV1Client, ctx context.Context, node *corev1.Node, setLabels map[string]string) error { + nodePatchMetadata := &nodePatchMetadata{ + Labels: setLabels, + } + nodePatch := &nodePatch{ + Metadata: nodePatchMetadata, + } + nodePatchJson, err := json.Marshal(nodePatch) + if err != nil { + return fmt.Errorf("error building node patch: %v", err) + } + + klog.V(2).Infof("sending patch for node %q: %q", node.Name, string(nodePatchJson)) + + _, err = client.Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, nodePatchJson, metav1.PatchOptions{}) + if err != nil { + return fmt.Errorf("error applying patch to node: %v", err) + } + + return nil +} diff --git a/cmd/kops-controller/main.go b/cmd/kops-controller/main.go index c4fea4c9b8..db3155265e 100644 --- a/cmd/kops-controller/main.go +++ b/cmd/kops-controller/main.go @@ -146,7 +146,8 @@ func buildScheme() error { } func addNodeController(mgr manager.Manager, opt *config.Options) error { - var identifier nodeidentity.LegacyIdentifier + var legacyIdentifier nodeidentity.LegacyIdentifier + var identifier nodeidentity.Identifier var err error switch opt.Cloud { case "aws": @@ -154,20 +155,21 @@ func addNodeController(mgr manager.Manager, opt *config.Options) error { if err != nil { return fmt.Errorf("error building identifier: %v", err) } + case "gce": - identifier, err = nodeidentitygce.New() + legacyIdentifier, err = nodeidentitygce.New() if err != nil { return fmt.Errorf("error building identifier: %v", err) } case "openstack": - identifier, err = nodeidentityos.New() + legacyIdentifier, err = nodeidentityos.New() if err != nil { return fmt.Errorf("error building identifier: %v", err) } case "digitalocean": - identifier, err = nodeidentitydo.New() + legacyIdentifier, err = nodeidentitydo.New() if err != nil { return fmt.Errorf("error building identifier: %v", err) } @@ -179,16 +181,26 @@ func addNodeController(mgr manager.Manager, opt *config.Options) error { return fmt.Errorf("identifier for cloud %q not implemented", opt.Cloud) } - if opt.ConfigBase == "" { - return fmt.Errorf("must specify configBase") - } + if identifier != nil { + nodeController, err := controllers.NewNodeReconciler(mgr, identifier) + if err != nil { + return err + } + if err := nodeController.SetupWithManager(mgr); err != nil { + return err + } + } else { + if opt.ConfigBase == "" { + return fmt.Errorf("must specify configBase") + } - nodeController, err := controllers.NewLegacyNodeReconciler(mgr, opt.ConfigBase, identifier) - if err != nil { - return err - } - if err := nodeController.SetupWithManager(mgr); err != nil { - return err + nodeController, err := controllers.NewLegacyNodeReconciler(mgr, opt.ConfigBase, legacyIdentifier) + if err != nil { + return err + } + if err := nodeController.SetupWithManager(mgr); err != nil { + return err + } } return nil diff --git a/pkg/model/BUILD.bazel b/pkg/model/BUILD.bazel index d2ce3e26c3..4b14ff2243 100644 --- a/pkg/model/BUILD.bazel +++ b/pkg/model/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//pkg/model/iam:go_default_library", "//pkg/model/resources:go_default_library", "//pkg/nodeidentity/aws:go_default_library", + "//pkg/nodelabels:go_default_library", "//pkg/pki:go_default_library", "//pkg/rbac:go_default_library", "//pkg/tokens:go_default_library", diff --git a/pkg/model/context.go b/pkg/model/context.go index 6eedce2a17..f946cddc3d 100644 --- a/pkg/model/context.go +++ b/pkg/model/context.go @@ -30,6 +30,7 @@ import ( "k8s.io/kops/pkg/model/components" "k8s.io/kops/pkg/model/iam" nodeidentityaws "k8s.io/kops/pkg/nodeidentity/aws" + "k8s.io/kops/pkg/nodelabels" "k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi/cloudup/awstasks" "k8s.io/kops/upup/pkg/fi/cloudup/awsup" @@ -40,7 +41,6 @@ import ( ) const ( - clusterAutoscalerNodeTemplateLabel = "k8s.io/cluster-autoscaler/node-template/label/" clusterAutoscalerNodeTemplateTaint = "k8s.io/cluster-autoscaler/node-template/taint/" ) @@ -192,8 +192,8 @@ func (m *KopsModelContext) CloudTagsForInstanceGroup(ig *kops.InstanceGroup) (ma } // Apply labels for cluster autoscaler node labels - for k, v := range ig.Spec.NodeLabels { - labels[clusterAutoscalerNodeTemplateLabel+k] = v + for k, v := range nodelabels.BuildNodeLabels(m.Cluster, ig) { + labels[nodeidentityaws.ClusterAutoscalerNodeTemplateLabel+k] = v } // Apply labels for cluster autoscaler node taints diff --git a/pkg/nodeidentity/aws/identify.go b/pkg/nodeidentity/aws/identify.go index d376b12e29..c8878e9be1 100644 --- a/pkg/nodeidentity/aws/identify.go +++ b/pkg/nodeidentity/aws/identify.go @@ -38,7 +38,8 @@ const ( // CloudTagInstanceGroupName is a cloud tag that defines the instance group name // This is used by the aws nodeidentifier to securely identify the node instancegroup CloudTagInstanceGroupName = "kops.k8s.io/instancegroup" - + // ClusterAutoscalerNodeTemplateLabel is the prefix used on node labels when copying to cloud tags. + ClusterAutoscalerNodeTemplateLabel = "k8s.io/cluster-autoscaler/node-template/label/" // The expiration time of nodeidentity.Info cache. cacheTTL = 60 * time.Minute ) @@ -55,7 +56,7 @@ type nodeIdentifier struct { } // New creates and returns a nodeidentity.Identifier for Nodes running on AWS -func New(CacheNodeidentityInfo bool) (nodeidentity.LegacyIdentifier, error) { +func New(CacheNodeidentityInfo bool) (nodeidentity.Identifier, error) { config := aws.NewConfig() config = config.WithCredentialsChainVerboseErrors(true) @@ -91,7 +92,7 @@ func stringKeyFunc(obj interface{}) (string, error) { } // IdentifyNode queries AWS for the node identity information -func (i *nodeIdentifier) IdentifyNode(ctx context.Context, node *corev1.Node) (*nodeidentity.LegacyInfo, error) { +func (i *nodeIdentifier) IdentifyNode(ctx context.Context, node *corev1.Node) (*nodeidentity.Info, error) { providerID := node.Spec.ProviderID if providerID == "" { return nil, fmt.Errorf("providerID was not set for node %s", node.Name) @@ -134,21 +135,21 @@ func (i *nodeIdentifier) IdentifyNode(ctx context.Context, node *corev1.Node) (* return nil, fmt.Errorf("found instance %q, but state is %q", instanceID, instanceState) } - lifecycle := "" + labels := map[string]string{} if instance.InstanceLifecycle != nil { - lifecycle = *instance.InstanceLifecycle + labels[fmt.Sprintf("node-role.kubernetes.io/%s-worker", *instance.InstanceLifecycle)] = "true" } - // TODO: Should we traverse to the ASG to confirm the tags there? - igName := getTag(instance.Tags, CloudTagInstanceGroupName) - if igName == "" { - return nil, fmt.Errorf("%s tag not set on instance %s", CloudTagInstanceGroupName, aws.StringValue(instance.InstanceId)) + info := &nodeidentity.Info{ + InstanceID: instanceID, + Labels: labels, } - info := &nodeidentity.LegacyInfo{} - info.InstanceID = instanceID - info.InstanceGroup = igName - info.InstanceLifecycle = lifecycle + for _, tag := range instance.Tags { + if strings.HasPrefix(aws.StringValue(tag.Key), ClusterAutoscalerNodeTemplateLabel) { + info.Labels[strings.TrimPrefix(aws.StringValue(tag.Key), ClusterAutoscalerNodeTemplateLabel)] = aws.StringValue(tag.Value) + } + } // If caching is enabled add the nodeidentity.Info to cache. if i.cacheEnabled { @@ -182,12 +183,3 @@ func (i *nodeIdentifier) getInstance(instanceID string) (*ec2.Instance, error) { instance := resp.Reservations[0].Instances[0] return instance, nil } - -func getTag(tags []*ec2.Tag, key string) string { - for _, tag := range tags { - if key == aws.StringValue(tag.Key) { - return aws.StringValue(tag.Value) - } - } - return "" -} diff --git a/pkg/nodeidentity/interfaces.go b/pkg/nodeidentity/interfaces.go index f816d8b516..65d9bd8f8c 100644 --- a/pkg/nodeidentity/interfaces.go +++ b/pkg/nodeidentity/interfaces.go @@ -22,6 +22,15 @@ import ( corev1 "k8s.io/api/core/v1" ) +type Identifier interface { + IdentifyNode(ctx context.Context, node *corev1.Node) (*Info, error) +} + +type Info struct { + InstanceID string + Labels map[string]string +} + type LegacyIdentifier interface { IdentifyNode(ctx context.Context, node *corev1.Node) (*LegacyInfo, error) }