Put versioned API of cluster into state store

This commit is contained in:
John Gardiner Myers 2020-05-31 15:21:12 -07:00
parent 8d91f868b1
commit 53695fc183
20 changed files with 94 additions and 167 deletions

View File

@ -11,6 +11,7 @@ go_library(
deps = [
"//pkg/apis/kops:go_default_library",
"//pkg/apis/kops/registry:go_default_library",
"//pkg/kopscodecs:go_default_library",
"//pkg/nodeidentity:go_default_library",
"//pkg/nodelabels:go_default_library",
"//upup/pkg/fi/utils:go_default_library",

View File

@ -28,6 +28,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/apis/kops/registry"
"k8s.io/kops/pkg/kopscodecs"
"k8s.io/kops/pkg/nodeidentity"
"k8s.io/kops/pkg/nodelabels"
"k8s.io/kops/upup/pkg/fi/utils"
@ -205,12 +206,14 @@ func (r *LegacyNodeReconciler) loadCluster(p vfs.Path) (*kops.Cluster, error) {
return nil, fmt.Errorf("error loading Cluster %q: %v", p, err)
}
cluster := &kops.Cluster{}
if err := utils.YamlUnmarshal(b, cluster); err != nil {
o, _, err := kopscodecs.Decode(b, nil)
if err != nil {
return nil, fmt.Errorf("error parsing Cluster %q: %v", p, err)
}
return cluster, nil
if cluster, ok := o.(*kops.Cluster); ok {
return cluster, nil
}
return nil, fmt.Errorf("unexpected object type for Cluster %q: %T", p, o)
}
// loadInstanceGroup loads a kops.InstanceGroup object from the vfs backing store

View File

@ -611,15 +611,6 @@ func RunCreateCluster(ctx context.Context, f *util.Factory, out io.Writer, c *Cr
return fmt.Errorf("error writing updated configuration: %v", err)
}
configBase, err := clientset.ConfigBaseFor(cluster)
if err != nil {
return fmt.Errorf("error building ConfigBase for cluster: %v", err)
}
err = registry.WriteConfigDeprecated(cluster, configBase.Join(registry.PathClusterCompleted), fullCluster)
if err != nil {
return fmt.Errorf("error writing completed cluster spec: %v", err)
}
if len(c.SSHPublicKeys) == 0 {
autoloadSSHPublicKeys := true
switch c.CloudProvider {

View File

@ -29,7 +29,6 @@ import (
"github.com/spf13/cobra"
"k8s.io/kops/cmd/kops/util"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/apis/kops/registry"
"k8s.io/kops/pkg/apis/kops/validation"
"k8s.io/kops/pkg/assets"
"k8s.io/kops/pkg/commands"
@ -237,11 +236,6 @@ func RunEditCluster(ctx context.Context, f *util.Factory, cmd *cobra.Command, ar
continue
}
configBase, err := registry.ConfigBase(newCluster)
if err != nil {
return preservedFile(err, file, out)
}
// Retrieve the current status of the cluster. This will eventually be part of the cluster object.
status, err := cloud.FindClusterStatus(oldCluster)
if err != nil {
@ -254,11 +248,6 @@ func RunEditCluster(ctx context.Context, f *util.Factory, cmd *cobra.Command, ar
return preservedFile(err, file, out)
}
err = registry.WriteConfigDeprecated(newCluster, configBase.Join(registry.PathClusterCompleted), fullCluster)
if err != nil {
return preservedFile(fmt.Errorf("error writing completed cluster spec: %v", err), file, out)
}
return nil
}
}

View File

@ -31,6 +31,7 @@ import (
"k8s.io/kops/cmd/kops/util"
kopsapi "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/apis/kops/registry"
"k8s.io/kops/pkg/kopscodecs"
"k8s.io/kops/util/pkg/tables"
"k8s.io/kubectl/pkg/util/i18n"
"k8s.io/kubectl/pkg/util/templates"
@ -283,12 +284,21 @@ func fullClusterSpecs(clusters []*kopsapi.Cluster) ([]*kopsapi.Cluster, error) {
if err != nil {
return nil, fmt.Errorf("error reading full cluster spec for %q: %v", cluster.ObjectMeta.Name, err)
}
fullSpec := &kopsapi.Cluster{}
err = registry.ReadConfigDeprecated(configBase.Join(registry.PathClusterCompleted), fullSpec)
configPath := configBase.Join(registry.PathClusterCompleted)
b, err := configPath.ReadFile()
if err != nil {
return nil, fmt.Errorf("error reading full cluster spec for %q: %v", cluster.ObjectMeta.Name, err)
return nil, fmt.Errorf("error loading Cluster %q: %v", configPath, err)
}
o, _, err := kopscodecs.Decode(b, nil)
if err != nil {
return nil, fmt.Errorf("error parsing Cluster %q: %v", configPath, err)
}
if fullSpec, ok := o.(*kopsapi.Cluster); ok {
fullSpecs = append(fullSpecs, fullSpec)
} else {
return nil, fmt.Errorf("unexpected object type for Cluster %q: %T", configPath, o)
}
fullSpecs = append(fullSpecs, fullSpec)
}
return fullSpecs, nil
}

View File

@ -5,16 +5,13 @@ go_library(
srcs = [
"helpers.go",
"registry.go",
"statestore.go",
],
importpath = "k8s.io/kops/pkg/apis/kops/registry",
visibility = ["//visibility:public"],
deps = [
"//pkg/acls:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/client/simple:go_default_library",
"//pkg/kubemanifest:go_default_library",
"//upup/pkg/fi/utils:go_default_library",
"//util/pkg/vfs:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",

View File

@ -27,8 +27,8 @@ import (
const (
// Path for the user-specified cluster spec
PathCluster = "config"
// Path for completed cluster spec in the state store
PathClusterCompleted = "cluster.spec"
// PathClusterCompleted is the path for completed cluster spec in the state store.
PathClusterCompleted = "cluster-completed.spec"
// PathKopsVersionUpdated is the path for the version of kops last used to apply the cluster.
PathKopsVersionUpdated = "kops-version.txt"
)

View File

@ -1,95 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package registry
import (
"bytes"
"fmt"
"os"
"strings"
"k8s.io/kops/pkg/acls"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/upup/pkg/fi/utils"
"k8s.io/kops/util/pkg/vfs"
)
func ReadConfigDeprecated(configPath vfs.Path, config interface{}) error {
data, err := configPath.ReadFile()
if err != nil {
if os.IsNotExist(err) {
return err
}
return fmt.Errorf("error reading configuration file %s: %v", configPath, err)
}
// Yaml can't parse empty strings
configString := string(data)
configString = strings.TrimSpace(configString)
if configString != "" {
err = utils.YamlUnmarshal([]byte(configString), config)
if err != nil {
return fmt.Errorf("error parsing configuration: %v", err)
}
}
return nil
}
// WriteConfigDeprecated writes a config file as yaml.
// It is deprecated because it is unversioned, but it is still used, in particular for writing the completed config.
func WriteConfigDeprecated(cluster *kops.Cluster, configPath vfs.Path, config interface{}, writeOptions ...vfs.WriteOption) error {
data, err := utils.YamlMarshal(config)
if err != nil {
return fmt.Errorf("error marshaling configuration: %v", err)
}
create := false
for _, writeOption := range writeOptions {
switch writeOption {
case vfs.WriteOptionCreate:
create = true
case vfs.WriteOptionOnlyIfExists:
_, err = configPath.ReadFile()
if err != nil {
if os.IsNotExist(err) {
return fmt.Errorf("cannot update configuration file %s: does not exist", configPath)
}
return fmt.Errorf("error checking if configuration file %s exists already: %v", configPath, err)
}
default:
return fmt.Errorf("unknown write option: %q", writeOption)
}
}
acl, err := acls.GetACL(configPath, cluster)
if err != nil {
return err
}
rs := bytes.NewReader(data)
if create {
err = configPath.CreateFile(rs, acl)
} else {
err = configPath.WriteFile(rs, acl)
}
if err != nil {
return fmt.Errorf("error writing configuration file %s: %v", configPath, err)
}
return nil
}

View File

@ -76,6 +76,12 @@ func (c *RESTClientset) UpdateCluster(ctx context.Context, cluster *kops.Cluster
return c.KopsClient.Clusters(namespace).Update(ctx, cluster, metav1.UpdateOptions{})
}
// UpdateCompletedCluster implements the UpdateCompletedCluster method of Clientset for a kubernetes-API state store
func (c *RESTClientset) UpdateCompletedCluster(ctx context.Context, cluster *kops.Cluster) error {
// Not implemented
return nil
}
// ConfigBaseFor implements the ConfigBaseFor method of Clientset for a kubernetes-API state store
func (c *RESTClientset) ConfigBaseFor(cluster *kops.Cluster) (vfs.Path, error) {
if cluster.Spec.ConfigBase != "" {

View File

@ -37,6 +37,9 @@ type Clientset interface {
// UpdateCluster updates a cluster
UpdateCluster(ctx context.Context, cluster *kops.Cluster, status *kops.ClusterStatus) (*kops.Cluster, error)
// UpdateCompletedCluster updates a completed cluster.
UpdateCompletedCluster(ctx context.Context, cluster *kops.Cluster) error
// ListClusters returns all clusters
ListClusters(ctx context.Context, options metav1.ListOptions) (*kops.ClusterList, error)

View File

@ -53,6 +53,11 @@ func (c *VFSClientset) UpdateCluster(ctx context.Context, cluster *kops.Cluster,
return c.clusters().Update(cluster, status)
}
// UpdateCompletedCluster implements the UpdateCluster method of simple.Clientset for a VFS-backed state store.
func (c *VFSClientset) UpdateCompletedCluster(ctx context.Context, cluster *kops.Cluster) error {
return c.clusters().UpdateCompleted(cluster)
}
// CreateCluster implements the CreateCluster method of simple.Clientset for a VFS-backed state store
func (c *VFSClientset) CreateCluster(ctx context.Context, cluster *kops.Cluster) (*kops.Cluster, error) {
return c.clusters().Create(cluster)
@ -143,7 +148,8 @@ func DeleteAllClusterState(basePath vfs.Path) error {
continue
}
if relativePath == "config" || relativePath == "cluster.spec" || relativePath == registry.PathKopsVersionUpdated {
// "cluster.spec" was written by kOps 1.21 and earlier.
if relativePath == "config" || relativePath == "cluster.spec" || relativePath == "cluster-completed.spec" || relativePath == registry.PathKopsVersionUpdated {
continue
}
if strings.HasPrefix(relativePath, "addons/") {

View File

@ -152,6 +152,22 @@ func (r *ClusterVFS) Update(c *api.Cluster, status *api.ClusterStatus) (*api.Clu
return c, nil
}
func (r *ClusterVFS) UpdateCompleted(c *api.Cluster) error {
clusterName := c.ObjectMeta.Name
if clusterName == "" {
return field.Required(field.NewPath("objectMeta", "name"), "clusterName is required")
}
if err := r.writeConfig(c, r.basePath.Join(clusterName, registry.PathClusterCompleted), c); err != nil {
if os.IsNotExist(err) {
return err
}
return fmt.Errorf("error writing Cluster: %v", err)
}
return nil
}
// List returns a slice containing all the cluster names
// It skips directories that don't look like clusters
func (r *ClusterVFS) listNames() ([]string, error) {

View File

@ -276,7 +276,7 @@ func (b *PolicyBuilder) AddOSSPermissions(p *Policy) (*Policy, error) {
} else if b.Role == kops.InstanceGroupRoleNode {
resources := []string{
strings.Join([]string{b.RAMPrefix(), ":oss:*:*:", ramOSSPath, "/addons/*"}, ""),
strings.Join([]string{b.RAMPrefix(), ":oss:*:*:", ramOSSPath, "/cluster.spec"}, ""),
strings.Join([]string{b.RAMPrefix(), ":oss:*:*:", ramOSSPath, "/cluster-completed.spec"}, ""),
strings.Join([]string{b.RAMPrefix(), ":oss:*:*:", ramOSSPath, "/config"}, ""),
strings.Join([]string{b.RAMPrefix(), ":oss:*:*:", ramOSSPath, "/instancegroup/*"}, ""),
strings.Join([]string{b.RAMPrefix(), ":oss:*:*:", ramOSSPath, "/pki/issued/*"}, ""),

View File

@ -586,7 +586,7 @@ func ReadableStatePaths(cluster *kops.Cluster, role Subject) ([]string, error) {
case *NodeRoleNode:
paths = append(paths,
"/addons/*",
"/cluster.spec",
"/cluster-completed.spec",
"/igconfig/node/*",
"/pki/issued/*",
"/pki/ssh/*",

View File

@ -24,7 +24,7 @@
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/addons/*",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/cluster.spec",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/cluster-completed.spec",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/igconfig/node/*",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/pki/issued/*",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/pki/private/kube-proxy/*",

View File

@ -24,7 +24,7 @@
"Effect": "Allow",
"Resource": [
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/addons/*",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/cluster.spec",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/cluster-completed.spec",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/igconfig/node/*",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/pki/issued/*",
"arn:aws:s3:::kops-tests/iam-builder-test.k8s.local/pki/private/kube-proxy/*",

View File

@ -765,7 +765,7 @@ func (c *ApplyClusterCmd) Run(ctx context.Context) error {
return fmt.Errorf("error writing kops version: %v", err)
}
err = registry.WriteConfigDeprecated(cluster, configBase.Join(registry.PathClusterCompleted), c.Cluster)
err = c.Clientset.UpdateCompletedCluster(ctx, c.Cluster)
if err != nil {
return fmt.Errorf("error writing completed cluster spec: %v", err)
}

View File

@ -16,6 +16,7 @@ go_library(
"//pkg/apis/nodeup:go_default_library",
"//pkg/assets:go_default_library",
"//pkg/configserver:go_default_library",
"//pkg/kopscodecs:go_default_library",
"//upup/pkg/fi:go_default_library",
"//upup/pkg/fi/cloudup/awsup:go_default_library",
"//upup/pkg/fi/nodeup/cloudinit:go_default_library",

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kops/pkg/apis/nodeup"
"k8s.io/kops/pkg/assets"
"k8s.io/kops/pkg/configserver"
"k8s.io/kops/pkg/kopscodecs"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/cloudup/awsup"
"k8s.io/kops/upup/pkg/fi/nodeup/cloudinit"
@ -124,33 +125,41 @@ func (c *NodeUpCommand) Run(out io.Writer) error {
return fmt.Errorf("ConfigBase or ConfigServer is required")
}
c.cluster = &api.Cluster{}
if nodeConfig != nil {
if err := utils.YamlUnmarshal([]byte(nodeConfig.ClusterFullConfig), c.cluster); err != nil {
return fmt.Errorf("error parsing Cluster config response: %w", err)
}
} else {
clusterLocation := fi.StringValue(c.config.ClusterLocation)
var p vfs.Path
if clusterLocation != "" {
var err error
p, err = vfs.Context.BuildVfsPath(clusterLocation)
if err != nil {
return fmt.Errorf("error parsing ClusterLocation %q: %v", clusterLocation, err)
}
{
var b []byte
var clusterDescription string
if nodeConfig != nil {
b = []byte(nodeConfig.ClusterFullConfig)
clusterDescription = "config response"
} else {
p = configBase.Join(registry.PathClusterCompleted)
clusterLocation := fi.StringValue(c.config.ClusterLocation)
var p vfs.Path
if clusterLocation != "" {
var err error
p, err = vfs.Context.BuildVfsPath(clusterLocation)
if err != nil {
return fmt.Errorf("error parsing ClusterLocation %q: %v", clusterLocation, err)
}
} else {
p = configBase.Join(registry.PathClusterCompleted)
}
var err error
b, err = p.ReadFile()
if err != nil {
return fmt.Errorf("error loading Cluster %q: %v", p, err)
}
clusterDescription = fmt.Sprintf("%q", p)
}
b, err := p.ReadFile()
o, _, err := kopscodecs.Decode(b, nil)
if err != nil {
return fmt.Errorf("error loading Cluster %q: %v", p, err)
return fmt.Errorf("error parsing Cluster %s: %v", clusterDescription, err)
}
err = utils.YamlUnmarshal(b, c.cluster)
if err != nil {
return fmt.Errorf("error parsing Cluster %q: %v", p, err)
var ok bool
if c.cluster, ok = o.(*api.Cluster); !ok {
return fmt.Errorf("unexpected object type for Cluster %s: %T", clusterDescription, o)
}
}

View File

@ -108,7 +108,7 @@ func (x *ConvertKubeupCluster) Upgrade(ctx context.Context) error {
}
assetBuilder := assets.NewAssetBuilder(cluster, false)
fullCluster, err := cloudup.PopulateClusterSpec(x.Clientset, cluster, x.Cloud, assetBuilder)
_, err = cloudup.PopulateClusterSpec(x.Clientset, cluster, x.Cloud, assetBuilder)
if err != nil {
return err
}
@ -470,16 +470,6 @@ func (x *ConvertKubeupCluster) Upgrade(ctx context.Context) error {
return fmt.Errorf("error writing updated configuration: %v", err)
}
// TODO: No longer needed?
err = registry.WriteConfigDeprecated(cluster, newConfigBase.Join(registry.PathClusterCompleted), fullCluster)
if err != nil {
return fmt.Errorf("error writing completed cluster spec: %v", err)
}
if err != nil {
return fmt.Errorf("error writing completed cluster spec: %v", err)
}
oldCACertPool, err := oldKeyStore.FindCertificatePool(fi.CertificateIDCA)
if err != nil {
return fmt.Errorf("error reading old CA certs: %v", err)