Merge pull request #13049 from olemarkus/channels-refactor

Preload channel versions from namespaces
This commit is contained in:
Kubernetes Prow Robot 2022-01-19 00:28:06 -08:00 committed by GitHub
commit 4ca5b6a0e6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 238 additions and 101 deletions

View File

@ -90,35 +90,34 @@ func (a *Addon) ChannelVersion() *ChannelVersion {
}
func (a *Addon) buildChannel() *Channel {
namespace := "kube-system"
if a.Spec.Namespace != nil {
namespace = *a.Spec.Namespace
}
channel := &Channel{
Namespace: namespace,
Namespace: a.GetNamespace(),
Name: a.Name,
}
return channel
}
func (a *Addon) GetRequiredUpdates(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface) (*AddonUpdate, error) {
func (a *Addon) GetNamespace() string {
namespace := "kube-system"
if a.Spec.Namespace != nil {
namespace = *a.Spec.Namespace
}
return namespace
}
func (a *Addon) GetRequiredUpdates(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, existingVersion *ChannelVersion) (*AddonUpdate, error) {
newVersion := a.ChannelVersion()
channel := a.buildChannel()
existingVersion, err := channel.GetInstalledVersion(ctx, k8sClient)
if err != nil {
return nil, err
}
pkiInstalled := true
if a.Spec.NeedsPKI {
pkiInstalled, err = channel.IsPKIInstalled(ctx, k8sClient, cmClient)
needsPKI, err := channel.IsPKIInstalled(ctx, k8sClient, cmClient)
if err != nil {
return nil, err
}
pkiInstalled = needsPKI
}
if existingVersion != nil && !newVersion.replaces(a.Name, existingVersion) {
@ -153,8 +152,8 @@ func (a *Addon) GetManifestFullUrl() (*url.URL, error) {
return manifestURL, nil
}
func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner) (*AddonUpdate, error) {
required, err := a.GetRequiredUpdates(ctx, k8sClient, cmClient)
func (a *Addon) EnsureUpdated(ctx context.Context, k8sClient kubernetes.Interface, cmClient certmanager.Interface, pruner *Pruner, existingVersion *ChannelVersion) (*AddonUpdate, error) {
required, err := a.GetRequiredUpdates(ctx, k8sClient, cmClient, existingVersion)
if err != nil {
return nil, err
}

View File

@ -149,7 +149,7 @@ func Test_GetRequiredUpdates(t *testing.T) {
NeedsPKI: true,
},
}
addonUpdate, err := addon.GetRequiredUpdates(ctx, fakek8s, fakecm)
addonUpdate, err := addon.GetRequiredUpdates(ctx, fakek8s, fakecm, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -242,13 +242,15 @@ func Test_NeedsRollingUpdate(t *testing.T) {
annotations = g.originalAnnotations
}
objects := []runtime.Object{
&corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-system",
Annotations: annotations,
},
kubeSystem := &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-system",
Annotations: annotations,
},
}
objects := []runtime.Object{
kubeSystem,
&corev1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "cp",
@ -266,11 +268,14 @@ func Test_NeedsRollingUpdate(t *testing.T) {
},
},
}
existingChannels := FindChannelVersions(kubeSystem)
fakek8s := fakekubernetes.NewSimpleClientset(objects...)
fakecm := fakecertmanager.NewSimpleClientset()
addon := g.newAddon
required, err := addon.GetRequiredUpdates(ctx, fakek8s, fakecm)
required, err := addon.GetRequiredUpdates(ctx, fakek8s, fakecm, existingChannels[addon.Name])
if err != nil {
t.Errorf("unexpected error: %v", err)
}

View File

@ -83,7 +83,7 @@ func ParseChannelVersion(s string) (*ChannelVersion, error) {
return v, nil
}
func FindAddons(ns *v1.Namespace) map[string]*ChannelVersion {
func FindChannelVersions(ns *v1.Namespace) map[string]*ChannelVersion {
addons := make(map[string]*ChannelVersion)
for k, v := range ns.Annotations {
if !strings.HasPrefix(k, AnnotationPrefix) {

View File

@ -1,4 +1,4 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
@ -31,3 +31,18 @@ go_library(
"//vendor/k8s.io/klog/v2:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["apply_channel_test.go"],
embed = [":go_default_library"],
deps = [
"//channels/pkg/api:go_default_library",
"//channels/pkg/channels:go_default_library",
"//upup/pkg/fi:go_default_library",
"//vendor/github.com/jetstack/cert-manager/pkg/client/clientset/versioned/fake:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/fake:go_default_library",
],
)

View File

@ -25,7 +25,12 @@ import (
"strings"
"github.com/blang/semver/v4"
"github.com/jetstack/cert-manager/pkg/client/clientset/versioned"
"github.com/spf13/cobra"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/restmapper"
"k8s.io/klog/v2"
"k8s.io/kops/channels/pkg/channels"
"k8s.io/kops/pkg/apis/kops/util"
@ -89,84 +94,32 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
// Remove Pre and Patch, as they make semver comparisons impractical
kubernetesVersion.Pre = nil
menu := channels.NewAddonMenu()
for _, name := range args {
location, err := url.Parse(name)
if err != nil {
return fmt.Errorf("unable to parse argument %q as url", name)
}
if !location.IsAbs() {
// We recognize the following "well-known" format:
// <name> with no slashes ->
if strings.Contains(name, "/") {
return fmt.Errorf("channel format not recognized (did you mean to use `-f` to specify a local file?): %q", name)
}
expanded := "https://raw.githubusercontent.com/kubernetes/kops/master/addons/" + name + "/addon.yaml"
location, err = url.Parse(expanded)
if err != nil {
return fmt.Errorf("unable to parse expanded argument %q as url", expanded)
}
// Disallow the use of legacy addons from the "well-known" location starting Kubernetes 1.23:
// https://raw.githubusercontent.com/kubernetes/kops/master/addons/<name>/addon.yaml
if util.IsKubernetesGTE("1.23", kubernetesVersion) {
return fmt.Errorf("legacy addons are deprecated and unmaintained, use managed addons instead of %s", expanded)
} else {
klog.Warningf("Legacy addons are deprecated and unmaintained, use managed addons instead of %s", expanded)
}
}
o, err := channels.LoadAddons(name, location)
if err != nil {
return fmt.Errorf("error loading channel %q: %v", location, err)
}
current, err := o.GetCurrent(kubernetesVersion)
if err != nil {
return fmt.Errorf("error processing latest versions in %q: %v", location, err)
}
menu.MergeAddons(current)
// menu is the expected list of addons in the cluster and their configurations.
menu, err := buildMenu(kubernetesVersion, args, false)
if err != nil {
return fmt.Errorf("cannot build the addon menu from args: %w", err)
}
for _, f := range options.Files {
location, err := url.Parse(f)
if err != nil {
return fmt.Errorf("unable to parse argument %q as url", f)
}
if !location.IsAbs() {
cwd, err := os.Getwd()
if err != nil {
return fmt.Errorf("error getting current directory: %v", err)
}
baseURL, err := url.Parse(cwd + string(os.PathSeparator))
if err != nil {
return fmt.Errorf("error building url for current directory %q: %v", cwd, err)
}
location = baseURL.ResolveReference(location)
}
o, err := channels.LoadAddons(f, location)
if err != nil {
return fmt.Errorf("error loading file %q: %v", f, err)
}
filesMenu, err := buildMenu(kubernetesVersion, options.Files, true)
if err != nil {
return fmt.Errorf("cannot build the addon menu from files: %w", err)
}
menu.MergeAddons(filesMenu)
current, err := o.GetCurrent(kubernetesVersion)
if err != nil {
return fmt.Errorf("error processing latest versions in %q: %v", f, err)
}
menu.MergeAddons(current)
return applyMenu(ctx, menu, k8sClient, cmClient, dynamicClient, restMapper, options.Yes)
}
func applyMenu(ctx context.Context, menu *channels.AddonMenu, k8sClient kubernetes.Interface, cmClient versioned.Interface, dynamicClient dynamic.Interface, restMapper *restmapper.DeferredDiscoveryRESTMapper, apply bool) error {
// channelVersions is the list of installed addons in the cluster.
// It is keyed by <namespace>:<addon name>.
channelVersions, err := getChannelVersions(ctx, k8sClient)
if err != nil {
return fmt.Errorf("cannot fetch channel versions from namespaces: %w", err)
}
var updates []*channels.AddonUpdate
var needUpdates []*channels.Addon
for _, addon := range menu.Addons {
// TODO: Cache lookups to prevent repeated lookups?
update, err := addon.GetRequiredUpdates(ctx, k8sClient, cmClient)
if err != nil {
return fmt.Errorf("error checking for required update: %v", err)
}
if update != nil {
updates = append(updates, update)
needUpdates = append(needUpdates, addon)
}
updates, needUpdates, err := getUpdates(ctx, menu, k8sClient, cmClient, channelVersions)
if err != nil {
return fmt.Errorf("failed to get updates: %w", err)
}
if len(updates) == 0 {
@ -205,7 +158,7 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
}
}
if !options.Yes {
if !apply {
fmt.Printf("\nMust specify --yes to update\n")
return nil
}
@ -216,7 +169,7 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
}
for _, needUpdate := range needUpdates {
update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner)
update, err := needUpdate.EnsureUpdated(ctx, k8sClient, cmClient, pruner, channelVersions[needUpdate.GetNamespace()+":"+needUpdate.Name])
if err != nil {
fmt.Printf("error updating %q: %v", needUpdate.Name, err)
} else if update != nil {
@ -228,3 +181,89 @@ func RunApplyChannel(ctx context.Context, f Factory, out io.Writer, options *App
return nil
}
func getUpdates(ctx context.Context, menu *channels.AddonMenu, k8sClient kubernetes.Interface, cmClient versioned.Interface, channelVersions map[string]*channels.ChannelVersion) ([]*channels.AddonUpdate, []*channels.Addon, error) {
var updates []*channels.AddonUpdate
var needUpdates []*channels.Addon
for _, addon := range menu.Addons {
update, err := addon.GetRequiredUpdates(ctx, k8sClient, cmClient, channelVersions[addon.GetNamespace()+":"+addon.Name])
if err != nil {
return nil, nil, fmt.Errorf("error checking for required update: %v", err)
}
if update != nil {
updates = append(updates, update)
needUpdates = append(needUpdates, addon)
}
}
return updates, needUpdates, nil
}
func getChannelVersions(ctx context.Context, k8sClient kubernetes.Interface) (map[string]*channels.ChannelVersion, error) {
namespaces, err := k8sClient.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("error listing namespaces: %v", err)
}
channelVersions := make(map[string]*channels.ChannelVersion)
for i := range namespaces.Items {
ns := &namespaces.Items[i]
addons := channels.FindChannelVersions(ns)
for name, version := range addons {
channelVersions[ns.Name+":"+name] = version
}
}
return channelVersions, nil
}
func buildMenu(kubernetesVersion semver.Version, args []string, localFiles bool) (*channels.AddonMenu, error) {
menu := channels.NewAddonMenu()
for _, name := range args {
location, err := url.Parse(name)
if err != nil {
return nil, fmt.Errorf("unable to parse argument %q as url", name)
}
if !location.IsAbs() {
if !localFiles {
// We recognize the following "well-known" format:
// <name> with no slashes ->
if strings.Contains(name, "/") {
return nil, fmt.Errorf("channel format not recognized (did you mean to use `-f` to specify a local file?): %q", name)
}
expanded := "https://raw.githubusercontent.com/kubernetes/kops/master/addons/" + name + "/addon.yaml"
location, err = url.Parse(expanded)
if err != nil {
return nil, fmt.Errorf("unable to parse expanded argument %q as url", expanded)
}
// Disallow the use of legacy addons from the "well-known" location starting Kubernetes 1.23:
// https://raw.githubusercontent.com/kubernetes/kops/master/addons/<name>/addon.yaml
if util.IsKubernetesGTE("1.23", kubernetesVersion) {
return nil, fmt.Errorf("legacy addons are deprecated and unmaintained, use managed addons instead of %s", expanded)
} else {
klog.Warningf("Legacy addons are deprecated and unmaintained, use managed addons instead of %s", expanded)
}
} else {
cwd, err := os.Getwd()
if err != nil {
return nil, fmt.Errorf("error getting current directory: %v", err)
}
baseURL, err := url.Parse(cwd + string(os.PathSeparator))
if err != nil {
return nil, fmt.Errorf("error building url for current directory %q: %v", cwd, err)
}
location = baseURL.ResolveReference(location)
}
}
o, err := channels.LoadAddons(name, location)
if err != nil {
return nil, fmt.Errorf("error loading channel %q: %v", location, err)
}
current, err := o.GetCurrent(kubernetesVersion)
if err != nil {
return nil, fmt.Errorf("error processing latest versions in %q: %v", location, err)
}
menu.MergeAddons(current)
}
return menu, nil
}

View File

@ -0,0 +1,79 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"context"
"testing"
cmfake "github.com/jetstack/cert-manager/pkg/client/clientset/versioned/fake"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
fakek8s "k8s.io/client-go/kubernetes/fake"
"k8s.io/kops/channels/pkg/api"
"k8s.io/kops/channels/pkg/channels"
"k8s.io/kops/upup/pkg/fi"
)
func TestGetUpdates(t *testing.T) {
// This test checks checks that the addon is applied to the correct namespace.
// It should be applied to kube-system even though the same addon has already been applied to default.
kubeSystemNS := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "kube-system",
},
}
defaultNS := corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: "default",
Annotations: map[string]string{
"addons.k8s.io/aws-ebs-csi-driver.addons.k8s.io": "{\"channel\":\"s3://mystatestore/cluster.example.com/addons/bootstrap-channel.yaml\",\"id\":\"k8s-1.17\",\"manifestHash\":\"abc\",\"systemGeneration\":1}",
},
},
}
k8sClient := fakek8s.NewSimpleClientset(&kubeSystemNS, &defaultNS)
ctx := context.Background()
channelVersions, err := getChannelVersions(ctx, k8sClient)
if err != nil {
t.Errorf("failed to get channel versions: %v", err)
}
menu := channels.NewAddonMenu()
menu.Addons = map[string]*channels.Addon{
"aws-ebs-csi-driver.addons.k8s.io": {
Name: "aws-ebs-csi-driver.addons.k8s.io",
Spec: &api.AddonSpec{
Name: fi.String("aws-ebs-csi-driver.addons.k8s.io"),
Id: "k8s-1.17",
ManifestHash: "abc",
},
},
}
_, needUpdates, err := getUpdates(ctx, menu, k8sClient, cmfake.NewSimpleClientset(), channelVersions)
if err != nil {
t.Errorf("failed to get updates: %v", err)
}
if len(needUpdates) != 1 {
t.Fatalf("expected 1 update, but got %d", len(needUpdates))
}
if needUpdates[0].GetNamespace() != "kube-system" {
t.Errorf("expected update in kube-system, but update applied to %q", needUpdates[0].GetNamespace())
}
}

View File

@ -85,7 +85,7 @@ func RunGetAddons(ctx context.Context, f Factory, out io.Writer, options *GetAdd
for i := range namespaces.Items {
ns := &namespaces.Items[i]
addons := channels.FindAddons(ns)
addons := channels.FindChannelVersions(ns)
for name, version := range addons {
i := &addonInfo{
Name: name,