Take node labels from cloud tags on AWS

This commit is contained in:
John Gardiner Myers 2020-08-30 15:30:30 -07:00
parent 24ff622d8e
commit 7069aaabf6
8 changed files with 209 additions and 75 deletions

View File

@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["legacy_node_controller.go"], srcs = [
"legacy_node_controller.go",
"node_controller.go",
],
importpath = "k8s.io/kops/cmd/kops-controller/controllers", importpath = "k8s.io/kops/cmd/kops-controller/controllers",
visibility = ["//visibility:public"], visibility = ["//visibility:public"],
deps = [ deps = [

View File

@ -18,15 +18,12 @@ package controllers
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"time" "time"
"github.com/go-logr/logr" "github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1" corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2" "k8s.io/klog/v2"
"k8s.io/kops/pkg/apis/kops" "k8s.io/kops/pkg/apis/kops"
@ -87,7 +84,7 @@ type LegacyNodeReconciler struct {
} }
// +kubebuilder:rbac:groups=,resources=nodes,verbs=get;list;watch;patch // +kubebuilder:rbac:groups=,resources=nodes,verbs=get;list;watch;patch
// Reconciler is the main reconciler function that observes node changes // Reconcile is the main reconciler function that observes node changes.
func (r *LegacyNodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { func (r *LegacyNodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background() ctx := context.Background()
_ = r.log.WithValues("nodecontroller", req.NamespacedName) _ = r.log.WithValues("nodecontroller", req.NamespacedName)
@ -138,7 +135,7 @@ func (r *LegacyNodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error)
return ctrl.Result{}, nil return ctrl.Result{}, nil
} }
if err := r.patchNodeLabels(ctx, node, updateLabels); err != nil { if err := patchNodeLabels(r.coreV1Client, ctx, node, updateLabels); err != nil {
klog.Warningf("failed to patch node labels on %s: %v", node.Name, err) klog.Warningf("failed to patch node labels on %s: %v", node.Name, err)
return ctrl.Result{}, err return ctrl.Result{}, err
} }
@ -152,37 +149,6 @@ func (r *LegacyNodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
Complete(r) Complete(r)
} }
type nodePatch struct {
Metadata *nodePatchMetadata `json:"metadata,omitempty"`
}
type nodePatchMetadata struct {
Labels map[string]string `json:"labels,omitempty"`
}
// patchNodeLabels patches the node labels to set the specified labels
func (r *LegacyNodeReconciler) patchNodeLabels(ctx context.Context, node *corev1.Node, setLabels map[string]string) error {
nodePatchMetadata := &nodePatchMetadata{
Labels: setLabels,
}
nodePatch := &nodePatch{
Metadata: nodePatchMetadata,
}
nodePatchJson, err := json.Marshal(nodePatch)
if err != nil {
return fmt.Errorf("error building node patch: %v", err)
}
klog.V(2).Infof("sending patch for node %q: %q", node.Name, string(nodePatchJson))
_, err = r.coreV1Client.Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, nodePatchJson, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error applying patch to node: %v", err)
}
return nil
}
// getClusterForNode returns the kops.Cluster object for the node // getClusterForNode returns the kops.Cluster object for the node
// The cluster is actually loaded when we first start // The cluster is actually loaded when we first start
func (r *LegacyNodeReconciler) getClusterForNode(node *corev1.Node) (*kops.Cluster, error) { func (r *LegacyNodeReconciler) getClusterForNode(node *corev1.Node) (*kops.Cluster, error) {

View File

@ -0,0 +1,151 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"encoding/json"
"fmt"
"github.com/go-logr/logr"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/klog/v2"
"k8s.io/kops/pkg/nodeidentity"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/manager"
)
// NewNodeReconciler is the constructor for a NodeReconciler
func NewNodeReconciler(mgr manager.Manager, identifier nodeidentity.Identifier) (*NodeReconciler, error) {
r := &NodeReconciler{
client: mgr.GetClient(),
log: ctrl.Log.WithName("controllers").WithName("Node"),
identifier: identifier,
}
coreClient, err := corev1client.NewForConfig(mgr.GetConfig())
if err != nil {
return nil, fmt.Errorf("error building corev1 client: %v", err)
}
r.coreV1Client = coreClient
return r, nil
}
// NodeReconciler observes Node objects, and labels them with the correct labels for the instancegroup
// This used to be done by the kubelet, but is moving to a central controller for greater security in 1.16
type NodeReconciler struct {
// client is the controller-runtime client
client client.Client
// log is a logr
log logr.Logger
// coreV1Client is a client-go client for patching nodes
coreV1Client *corev1client.CoreV1Client
// identifier is a provider that can securely map node ProviderIDs to labels
identifier nodeidentity.Identifier
}
// +kubebuilder:rbac:groups=,resources=nodes,verbs=get;list;watch;patch
// Reconcile is the main reconciler function that observes node changes.
func (r *NodeReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
_ = r.log.WithValues("nodecontroller", req.NamespacedName)
node := &corev1.Node{}
if err := r.client.Get(ctx, req.NamespacedName, node); err != nil {
klog.Warningf("unable to fetch node %s: %v", node.Name, err)
if apierrors.IsNotFound(err) {
// we'll ignore not-found errors, since they can't be fixed by an immediate
// requeue (we'll need to wait for a new notification), and we can get them
// on deleted requests.
return ctrl.Result{}, nil
}
return ctrl.Result{}, err
}
info, err := r.identifier.IdentifyNode(ctx, node)
if err != nil {
return ctrl.Result{}, fmt.Errorf("error identifying node %q: %v", node.Name, err)
}
labels := info.Labels
updateLabels := make(map[string]string)
for k, v := range labels {
actual, found := node.Labels[k]
if !found || actual != v {
updateLabels[k] = v
}
}
if len(updateLabels) == 0 {
klog.V(4).Infof("no label changes needed for %s", node.Name)
return ctrl.Result{}, nil
}
if err := patchNodeLabels(r.coreV1Client, ctx, node, updateLabels); err != nil {
klog.Warningf("failed to patch node labels on %s: %v", node.Name, err)
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
}
func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&corev1.Node{}).
Complete(r)
}
type nodePatch struct {
Metadata *nodePatchMetadata `json:"metadata,omitempty"`
}
type nodePatchMetadata struct {
Labels map[string]string `json:"labels,omitempty"`
}
// patchNodeLabels patches the node labels to set the specified labels
func patchNodeLabels(client *corev1client.CoreV1Client, ctx context.Context, node *corev1.Node, setLabels map[string]string) error {
nodePatchMetadata := &nodePatchMetadata{
Labels: setLabels,
}
nodePatch := &nodePatch{
Metadata: nodePatchMetadata,
}
nodePatchJson, err := json.Marshal(nodePatch)
if err != nil {
return fmt.Errorf("error building node patch: %v", err)
}
klog.V(2).Infof("sending patch for node %q: %q", node.Name, string(nodePatchJson))
_, err = client.Nodes().Patch(ctx, node.Name, types.StrategicMergePatchType, nodePatchJson, metav1.PatchOptions{})
if err != nil {
return fmt.Errorf("error applying patch to node: %v", err)
}
return nil
}

View File

@ -146,7 +146,8 @@ func buildScheme() error {
} }
func addNodeController(mgr manager.Manager, opt *config.Options) error { func addNodeController(mgr manager.Manager, opt *config.Options) error {
var identifier nodeidentity.LegacyIdentifier var legacyIdentifier nodeidentity.LegacyIdentifier
var identifier nodeidentity.Identifier
var err error var err error
switch opt.Cloud { switch opt.Cloud {
case "aws": case "aws":
@ -154,20 +155,21 @@ func addNodeController(mgr manager.Manager, opt *config.Options) error {
if err != nil { if err != nil {
return fmt.Errorf("error building identifier: %v", err) return fmt.Errorf("error building identifier: %v", err)
} }
case "gce": case "gce":
identifier, err = nodeidentitygce.New() legacyIdentifier, err = nodeidentitygce.New()
if err != nil { if err != nil {
return fmt.Errorf("error building identifier: %v", err) return fmt.Errorf("error building identifier: %v", err)
} }
case "openstack": case "openstack":
identifier, err = nodeidentityos.New() legacyIdentifier, err = nodeidentityos.New()
if err != nil { if err != nil {
return fmt.Errorf("error building identifier: %v", err) return fmt.Errorf("error building identifier: %v", err)
} }
case "digitalocean": case "digitalocean":
identifier, err = nodeidentitydo.New() legacyIdentifier, err = nodeidentitydo.New()
if err != nil { if err != nil {
return fmt.Errorf("error building identifier: %v", err) return fmt.Errorf("error building identifier: %v", err)
} }
@ -179,16 +181,26 @@ func addNodeController(mgr manager.Manager, opt *config.Options) error {
return fmt.Errorf("identifier for cloud %q not implemented", opt.Cloud) return fmt.Errorf("identifier for cloud %q not implemented", opt.Cloud)
} }
if opt.ConfigBase == "" { if identifier != nil {
return fmt.Errorf("must specify configBase") nodeController, err := controllers.NewNodeReconciler(mgr, identifier)
} if err != nil {
return err
}
if err := nodeController.SetupWithManager(mgr); err != nil {
return err
}
} else {
if opt.ConfigBase == "" {
return fmt.Errorf("must specify configBase")
}
nodeController, err := controllers.NewLegacyNodeReconciler(mgr, opt.ConfigBase, identifier) nodeController, err := controllers.NewLegacyNodeReconciler(mgr, opt.ConfigBase, legacyIdentifier)
if err != nil { if err != nil {
return err return err
} }
if err := nodeController.SetupWithManager(mgr); err != nil { if err := nodeController.SetupWithManager(mgr); err != nil {
return err return err
}
} }
return nil return nil

View File

@ -32,6 +32,7 @@ go_library(
"//pkg/model/iam:go_default_library", "//pkg/model/iam:go_default_library",
"//pkg/model/resources:go_default_library", "//pkg/model/resources:go_default_library",
"//pkg/nodeidentity/aws:go_default_library", "//pkg/nodeidentity/aws:go_default_library",
"//pkg/nodelabels:go_default_library",
"//pkg/pki:go_default_library", "//pkg/pki:go_default_library",
"//pkg/rbac:go_default_library", "//pkg/rbac:go_default_library",
"//pkg/tokens:go_default_library", "//pkg/tokens:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/kops/pkg/model/components" "k8s.io/kops/pkg/model/components"
"k8s.io/kops/pkg/model/iam" "k8s.io/kops/pkg/model/iam"
nodeidentityaws "k8s.io/kops/pkg/nodeidentity/aws" nodeidentityaws "k8s.io/kops/pkg/nodeidentity/aws"
"k8s.io/kops/pkg/nodelabels"
"k8s.io/kops/upup/pkg/fi" "k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awstasks" "k8s.io/kops/upup/pkg/fi/cloudup/awstasks"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup" "k8s.io/kops/upup/pkg/fi/cloudup/awsup"
@ -40,7 +41,6 @@ import (
) )
const ( const (
clusterAutoscalerNodeTemplateLabel = "k8s.io/cluster-autoscaler/node-template/label/"
clusterAutoscalerNodeTemplateTaint = "k8s.io/cluster-autoscaler/node-template/taint/" clusterAutoscalerNodeTemplateTaint = "k8s.io/cluster-autoscaler/node-template/taint/"
) )
@ -192,8 +192,8 @@ func (m *KopsModelContext) CloudTagsForInstanceGroup(ig *kops.InstanceGroup) (ma
} }
// Apply labels for cluster autoscaler node labels // Apply labels for cluster autoscaler node labels
for k, v := range ig.Spec.NodeLabels { for k, v := range nodelabels.BuildNodeLabels(m.Cluster, ig) {
labels[clusterAutoscalerNodeTemplateLabel+k] = v labels[nodeidentityaws.ClusterAutoscalerNodeTemplateLabel+k] = v
} }
// Apply labels for cluster autoscaler node taints // Apply labels for cluster autoscaler node taints

View File

@ -38,7 +38,8 @@ const (
// CloudTagInstanceGroupName is a cloud tag that defines the instance group name // CloudTagInstanceGroupName is a cloud tag that defines the instance group name
// This is used by the aws nodeidentifier to securely identify the node instancegroup // This is used by the aws nodeidentifier to securely identify the node instancegroup
CloudTagInstanceGroupName = "kops.k8s.io/instancegroup" CloudTagInstanceGroupName = "kops.k8s.io/instancegroup"
// ClusterAutoscalerNodeTemplateLabel is the prefix used on node labels when copying to cloud tags.
ClusterAutoscalerNodeTemplateLabel = "k8s.io/cluster-autoscaler/node-template/label/"
// The expiration time of nodeidentity.Info cache. // The expiration time of nodeidentity.Info cache.
cacheTTL = 60 * time.Minute cacheTTL = 60 * time.Minute
) )
@ -55,7 +56,7 @@ type nodeIdentifier struct {
} }
// New creates and returns a nodeidentity.Identifier for Nodes running on AWS // New creates and returns a nodeidentity.Identifier for Nodes running on AWS
func New(CacheNodeidentityInfo bool) (nodeidentity.LegacyIdentifier, error) { func New(CacheNodeidentityInfo bool) (nodeidentity.Identifier, error) {
config := aws.NewConfig() config := aws.NewConfig()
config = config.WithCredentialsChainVerboseErrors(true) config = config.WithCredentialsChainVerboseErrors(true)
@ -91,7 +92,7 @@ func stringKeyFunc(obj interface{}) (string, error) {
} }
// IdentifyNode queries AWS for the node identity information // IdentifyNode queries AWS for the node identity information
func (i *nodeIdentifier) IdentifyNode(ctx context.Context, node *corev1.Node) (*nodeidentity.LegacyInfo, error) { func (i *nodeIdentifier) IdentifyNode(ctx context.Context, node *corev1.Node) (*nodeidentity.Info, error) {
providerID := node.Spec.ProviderID providerID := node.Spec.ProviderID
if providerID == "" { if providerID == "" {
return nil, fmt.Errorf("providerID was not set for node %s", node.Name) return nil, fmt.Errorf("providerID was not set for node %s", node.Name)
@ -134,21 +135,21 @@ func (i *nodeIdentifier) IdentifyNode(ctx context.Context, node *corev1.Node) (*
return nil, fmt.Errorf("found instance %q, but state is %q", instanceID, instanceState) return nil, fmt.Errorf("found instance %q, but state is %q", instanceID, instanceState)
} }
lifecycle := "" labels := map[string]string{}
if instance.InstanceLifecycle != nil { if instance.InstanceLifecycle != nil {
lifecycle = *instance.InstanceLifecycle labels[fmt.Sprintf("node-role.kubernetes.io/%s-worker", *instance.InstanceLifecycle)] = "true"
} }
// TODO: Should we traverse to the ASG to confirm the tags there? info := &nodeidentity.Info{
igName := getTag(instance.Tags, CloudTagInstanceGroupName) InstanceID: instanceID,
if igName == "" { Labels: labels,
return nil, fmt.Errorf("%s tag not set on instance %s", CloudTagInstanceGroupName, aws.StringValue(instance.InstanceId))
} }
info := &nodeidentity.LegacyInfo{} for _, tag := range instance.Tags {
info.InstanceID = instanceID if strings.HasPrefix(aws.StringValue(tag.Key), ClusterAutoscalerNodeTemplateLabel) {
info.InstanceGroup = igName info.Labels[strings.TrimPrefix(aws.StringValue(tag.Key), ClusterAutoscalerNodeTemplateLabel)] = aws.StringValue(tag.Value)
info.InstanceLifecycle = lifecycle }
}
// If caching is enabled add the nodeidentity.Info to cache. // If caching is enabled add the nodeidentity.Info to cache.
if i.cacheEnabled { if i.cacheEnabled {
@ -182,12 +183,3 @@ func (i *nodeIdentifier) getInstance(instanceID string) (*ec2.Instance, error) {
instance := resp.Reservations[0].Instances[0] instance := resp.Reservations[0].Instances[0]
return instance, nil return instance, nil
} }
func getTag(tags []*ec2.Tag, key string) string {
for _, tag := range tags {
if key == aws.StringValue(tag.Key) {
return aws.StringValue(tag.Value)
}
}
return ""
}

View File

@ -22,6 +22,15 @@ import (
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
) )
type Identifier interface {
IdentifyNode(ctx context.Context, node *corev1.Node) (*Info, error)
}
type Info struct {
InstanceID string
Labels map[string]string
}
type LegacyIdentifier interface { type LegacyIdentifier interface {
IdentifyNode(ctx context.Context, node *corev1.Node) (*LegacyInfo, error) IdentifyNode(ctx context.Context, node *corev1.Node) (*LegacyInfo, error)
} }