mirror of https://github.com/kubernetes/kops.git
				
				
				
			Merge pull request #11708 from johngmyers/refactor-assets
Limit concurrency of asset copy tasks
This commit is contained in:
		
						commit
						364fe4ca86
					
				|  | @ -91,7 +91,6 @@ go_library( | |||
|         "//pkg/util/templater:go_default_library", | ||||
|         "//pkg/validation:go_default_library", | ||||
|         "//upup/pkg/fi:go_default_library", | ||||
|         "//upup/pkg/fi/assettasks:go_default_library", | ||||
|         "//upup/pkg/fi/cloudup:go_default_library", | ||||
|         "//upup/pkg/fi/cloudup/awsup:go_default_library", | ||||
|         "//upup/pkg/fi/utils:go_default_library", | ||||
|  |  | |||
|  | @ -1,5 +1,5 @@ | |||
| /* | ||||
| Copyright 2020 The Kubernetes Authors. | ||||
| Copyright 2021 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
|  | @ -22,8 +22,7 @@ import ( | |||
| 	"fmt" | ||||
| 	"io" | ||||
| 
 | ||||
| 	"k8s.io/kops/upup/pkg/fi" | ||||
| 	"k8s.io/kops/upup/pkg/fi/assettasks" | ||||
| 	"k8s.io/kops/pkg/assets" | ||||
| 	"k8s.io/kubectl/pkg/util/i18n" | ||||
| 	"k8s.io/kubectl/pkg/util/templates" | ||||
| 	"sigs.k8s.io/yaml" | ||||
|  | @ -58,9 +57,6 @@ type AssetResult struct { | |||
| 	Files []*File `json:"files,omitempty"` | ||||
| } | ||||
| 
 | ||||
| type copyAssetsTarget struct { | ||||
| } | ||||
| 
 | ||||
| func NewCmdGetAssets(f *util.Factory, out io.Writer, getOptions *GetOptions) *cobra.Command { | ||||
| 	options := GetAssetsOptions{ | ||||
| 		GetOptions: getOptions, | ||||
|  | @ -120,7 +116,6 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * | |||
| 		Images: make([]*Image, 0, len(updateClusterResults.ImageAssets)), | ||||
| 		Files:  make([]*File, 0, len(updateClusterResults.FileAssets)), | ||||
| 	} | ||||
| 	tasks := map[string]fi.Task{} | ||||
| 
 | ||||
| 	seen := map[string]bool{} | ||||
| 	for _, imageAsset := range updateClusterResults.ImageAssets { | ||||
|  | @ -132,24 +127,6 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * | |||
| 			result.Images = append(result.Images, &image) | ||||
| 			seen[image.Canonical] = true | ||||
| 		} | ||||
| 
 | ||||
| 		if options.Copy && imageAsset.DownloadLocation != imageAsset.CanonicalLocation { | ||||
| 			ctx := &fi.ModelBuilderContext{ | ||||
| 				Tasks: tasks, | ||||
| 			} | ||||
| 
 | ||||
| 			copyImageTask := &assettasks.CopyImage{ | ||||
| 				Name:        fi.String(imageAsset.DownloadLocation), | ||||
| 				SourceImage: fi.String(imageAsset.CanonicalLocation), | ||||
| 				TargetImage: fi.String(imageAsset.DownloadLocation), | ||||
| 				Lifecycle:   fi.LifecycleSync, | ||||
| 			} | ||||
| 
 | ||||
| 			if err := ctx.EnsureTask(copyImageTask); err != nil { | ||||
| 				return fmt.Errorf("error adding image-copy task: %v", err) | ||||
| 			} | ||||
| 			tasks = ctx.Tasks | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	seen = map[string]bool{} | ||||
|  | @ -163,41 +140,12 @@ func RunGetAssets(ctx context.Context, f *util.Factory, out io.Writer, options * | |||
| 			result.Files = append(result.Files, &file) | ||||
| 			seen[file.Canonical] = true | ||||
| 		} | ||||
| 
 | ||||
| 		// test if the asset needs to be copied
 | ||||
| 		if options.Copy && fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() { | ||||
| 			ctx := &fi.ModelBuilderContext{ | ||||
| 				Tasks: tasks, | ||||
| 			} | ||||
| 
 | ||||
| 			copyFileTask := &assettasks.CopyFile{ | ||||
| 				Name:       fi.String(fileAsset.CanonicalURL.String()), | ||||
| 				TargetFile: fi.String(fileAsset.DownloadURL.String()), | ||||
| 				SourceFile: fi.String(fileAsset.CanonicalURL.String()), | ||||
| 				SHA:        fi.String(fileAsset.SHAValue), | ||||
| 				Lifecycle:  fi.LifecycleSync, | ||||
| 			} | ||||
| 
 | ||||
| 			if err := ctx.EnsureTask(copyFileTask); err != nil { | ||||
| 				return fmt.Errorf("error adding file-copy task: %v", err) | ||||
| 			} | ||||
| 			tasks = ctx.Tasks | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	if options.Copy { | ||||
| 		var options fi.RunTasksOptions | ||||
| 		options.InitDefaults() | ||||
| 
 | ||||
| 		context, err := fi.NewContext(©AssetsTarget{}, updateClusterResults.Cluster, nil, nil, nil, nil, true, tasks) | ||||
| 		err := assets.Copy(updateClusterResults.ImageAssets, updateClusterResults.FileAssets, updateClusterResults.Cluster) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("error building context: %v", err) | ||||
| 		} | ||||
| 		defer context.Close() | ||||
| 
 | ||||
| 		err = context.RunTasks(options) | ||||
| 		if err != nil { | ||||
| 			return fmt.Errorf("error running tasks: %v", err) | ||||
| 			return err | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
|  | @ -260,11 +208,3 @@ func fileOutputTable(files []*File, out io.Writer) error { | |||
| 	columns := []string{"CANONICAL", "DOWNLOAD", "SHA"} | ||||
| 	return t.Render(files, out, columns...) | ||||
| } | ||||
| 
 | ||||
| func (c copyAssetsTarget) Finish(taskMap map[string]fi.Task) error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (c copyAssetsTarget) ProcessDeletions() bool { | ||||
| 	return false | ||||
| } | ||||
|  |  | |||
|  | @ -2,10 +2,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | |||
| 
 | ||||
| go_library( | ||||
|     name = "go_default_library", | ||||
|     srcs = ["builder.go"], | ||||
|     srcs = [ | ||||
|         "builder.go", | ||||
|         "copy.go", | ||||
|         "copyfile.go", | ||||
|         "copyimage.go", | ||||
|         "docker_api.go", | ||||
|         "docker_cli.go", | ||||
|     ], | ||||
|     importpath = "k8s.io/kops/pkg/assets", | ||||
|     visibility = ["//visibility:public"], | ||||
|     deps = [ | ||||
|         "//pkg/acls:go_default_library", | ||||
|         "//pkg/apis/kops:go_default_library", | ||||
|         "//pkg/apis/kops/util:go_default_library", | ||||
|         "//pkg/kubemanifest:go_default_library", | ||||
|  | @ -14,6 +22,9 @@ go_library( | |||
|         "//util/pkg/mirrors:go_default_library", | ||||
|         "//util/pkg/vfs:go_default_library", | ||||
|         "//vendor/github.com/blang/semver/v4:go_default_library", | ||||
|         "//vendor/github.com/docker/docker/api/types:go_default_library", | ||||
|         "//vendor/github.com/docker/docker/api/types/filters:go_default_library", | ||||
|         "//vendor/github.com/docker/docker/client:go_default_library", | ||||
|         "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", | ||||
|         "//vendor/k8s.io/klog/v2:go_default_library", | ||||
|     ], | ||||
|  | @ -21,7 +32,10 @@ go_library( | |||
| 
 | ||||
| go_test( | ||||
|     name = "go_default_test", | ||||
|     srcs = ["builder_test.go"], | ||||
|     srcs = [ | ||||
|         "builder_test.go", | ||||
|         "copyfile_test.go", | ||||
|     ], | ||||
|     data = glob(["testdata/**"]), | ||||
|     embed = [":go_default_library"], | ||||
|     deps = [ | ||||
|  |  | |||
|  | @ -0,0 +1,119 @@ | |||
| /* | ||||
| Copyright 2021 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assets | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 	"sort" | ||||
| 
 | ||||
| 	"k8s.io/klog/v2" | ||||
| 	"k8s.io/kops/pkg/apis/kops" | ||||
| ) | ||||
| 
 | ||||
| type assetTask interface { | ||||
| 	Run() error | ||||
| } | ||||
| 
 | ||||
| func Copy(imageAssets []*ImageAsset, fileAssets []*FileAsset, cluster *kops.Cluster) error { | ||||
| 	tasks := map[string]assetTask{} | ||||
| 
 | ||||
| 	for _, imageAsset := range imageAssets { | ||||
| 		if imageAsset.DownloadLocation != imageAsset.CanonicalLocation { | ||||
| 			copyImageTask := &CopyImage{ | ||||
| 				Name:        imageAsset.DownloadLocation, | ||||
| 				SourceImage: imageAsset.CanonicalLocation, | ||||
| 				TargetImage: imageAsset.DownloadLocation, | ||||
| 			} | ||||
| 
 | ||||
| 			if existing, ok := tasks[copyImageTask.Name]; ok { | ||||
| 				if existing.(*CopyImage).SourceImage != copyImageTask.SourceImage { | ||||
| 					return fmt.Errorf("different sources for same image target %s: %s vs %s", copyImageTask.Name, copyImageTask.SourceImage, existing.(*CopyImage).SourceImage) | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			tasks[copyImageTask.Name] = copyImageTask | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	for _, fileAsset := range fileAssets { | ||||
| 		if fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() { | ||||
| 			copyFileTask := &CopyFile{ | ||||
| 				Name:       fileAsset.CanonicalURL.String(), | ||||
| 				TargetFile: fileAsset.DownloadURL.String(), | ||||
| 				SourceFile: fileAsset.CanonicalURL.String(), | ||||
| 				SHA:        fileAsset.SHAValue, | ||||
| 				Cluster:    cluster, | ||||
| 			} | ||||
| 
 | ||||
| 			if existing, ok := tasks[copyFileTask.Name]; ok { | ||||
| 				e, ok := existing.(*CopyFile) | ||||
| 				if !ok { | ||||
| 					return fmt.Errorf("different types for copy target %s", copyFileTask.Name) | ||||
| 				} | ||||
| 				if e.TargetFile != copyFileTask.TargetFile { | ||||
| 					return fmt.Errorf("different targets for same file %s: %s vs %s", copyFileTask.Name, copyFileTask.TargetFile, e.TargetFile) | ||||
| 				} | ||||
| 				if e.SHA != copyFileTask.SHA { | ||||
| 					return fmt.Errorf("different sha for same file %s: %s vs %s", copyFileTask.Name, copyFileTask.SHA, e.SHA) | ||||
| 				} | ||||
| 			} | ||||
| 
 | ||||
| 			tasks[copyFileTask.Name] = copyFileTask | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	ch := make(chan error, 5) | ||||
| 	for i := 0; i < cap(ch); i++ { | ||||
| 		ch <- nil | ||||
| 	} | ||||
| 
 | ||||
| 	gotError := false | ||||
| 	names := make([]string, 0, len(tasks)) | ||||
| 	for name := range tasks { | ||||
| 		names = append(names, name) | ||||
| 	} | ||||
| 	sort.Strings(names) | ||||
| 	for _, name := range names { | ||||
| 		task := tasks[name] | ||||
| 		err := <-ch | ||||
| 		if err != nil { | ||||
| 			klog.Warning(err) | ||||
| 			gotError = true | ||||
| 		} | ||||
| 		go func(n string, t assetTask) { | ||||
| 			err := t.Run() | ||||
| 			if err != nil { | ||||
| 				err = fmt.Errorf("%s: %v", n, err) | ||||
| 			} | ||||
| 			ch <- err | ||||
| 		}(name, task) | ||||
| 	} | ||||
| 
 | ||||
| 	for i := 0; i < cap(ch); i++ { | ||||
| 		err := <-ch | ||||
| 		if err != nil { | ||||
| 			klog.Warning(err) | ||||
| 			gotError = true | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	close(ch) | ||||
| 	if gotError { | ||||
| 		return fmt.Errorf("not all assets copied successfully") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
|  | @ -14,7 +14,7 @@ See the License for the specific language governing permissions and | |||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assettasks | ||||
| package assets | ||||
| 
 | ||||
| import ( | ||||
| 	"bytes" | ||||
|  | @ -25,27 +25,19 @@ import ( | |||
| 
 | ||||
| 	"k8s.io/klog/v2" | ||||
| 	"k8s.io/kops/pkg/acls" | ||||
| 	"k8s.io/kops/upup/pkg/fi" | ||||
| 	"k8s.io/kops/pkg/apis/kops" | ||||
| 	"k8s.io/kops/util/pkg/hashing" | ||||
| 	"k8s.io/kops/util/pkg/vfs" | ||||
| ) | ||||
| 
 | ||||
| // CopyFile copies an from a source file repository, to a target repository,
 | ||||
| // typically used for highly secure clusters.
 | ||||
| // +kops:fitask
 | ||||
| type CopyFile struct { | ||||
| 	Name       *string | ||||
| 	SourceFile *string | ||||
| 	TargetFile *string | ||||
| 	SHA        *string | ||||
| 	Lifecycle  fi.Lifecycle | ||||
| } | ||||
| 
 | ||||
| var _ fi.CompareWithID = &CopyFile{} | ||||
| 
 | ||||
| func (e *CopyFile) CompareWithID() *string { | ||||
| 	// or should this be the SHA?
 | ||||
| 	return e.Name | ||||
| 	Name       string | ||||
| 	SourceFile string | ||||
| 	TargetFile string | ||||
| 	SHA        string | ||||
| 	Cluster    *kops.Cluster | ||||
| } | ||||
| 
 | ||||
| // fileExtensionForSHA returns the expected extension for the given hash
 | ||||
|  | @ -61,74 +53,42 @@ func fileExtensionForSHA(sha string) (string, error) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Find attempts to find a file.
 | ||||
| func (e *CopyFile) Find(c *fi.Context) (*CopyFile, error) { | ||||
| 	expectedSHA := strings.TrimSpace(fi.StringValue(e.SHA)) | ||||
| func (e *CopyFile) Run() error { | ||||
| 	expectedSHA := strings.TrimSpace(e.SHA) | ||||
| 
 | ||||
| 	shaExtension, err := fileExtensionForSHA(expectedSHA) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	targetSHAFile := fi.StringValue(e.TargetFile) + shaExtension | ||||
| 	targetSHAFile := e.TargetFile + shaExtension | ||||
| 
 | ||||
| 	targetSHABytes, err := vfs.Context.ReadFile(targetSHAFile) | ||||
| 	if err != nil { | ||||
| 		if os.IsNotExist(err) { | ||||
| 			klog.V(4).Infof("unable to download: %q, assuming target file is not present, and if not present may not be an error: %v", | ||||
| 				targetSHAFile, err) | ||||
| 			return nil, nil | ||||
| 		} else { | ||||
| 			klog.V(4).Infof("unable to download: %q, %v", targetSHAFile, err) | ||||
| 		} | ||||
| 		klog.V(4).Infof("unable to download: %q, %v", targetSHAFile, err) | ||||
| 		// TODO should we throw err here?
 | ||||
| 		return nil, nil | ||||
| 	} | ||||
| 	targetSHA := string(targetSHABytes) | ||||
| 	} else { | ||||
| 		targetSHA := string(targetSHABytes) | ||||
| 
 | ||||
| 	if strings.TrimSpace(targetSHA) == expectedSHA { | ||||
| 		actual := &CopyFile{ | ||||
| 			Name:       e.Name, | ||||
| 			TargetFile: e.TargetFile, | ||||
| 			SHA:        e.SHA, | ||||
| 			SourceFile: e.SourceFile, | ||||
| 			Lifecycle:  e.Lifecycle, | ||||
| 		if strings.TrimSpace(targetSHA) == expectedSHA { | ||||
| 			klog.V(8).Infof("found matching target sha for file: %q", e.TargetFile) | ||||
| 			return nil | ||||
| 		} | ||||
| 		klog.V(8).Infof("found matching target sha1 for file: %q", fi.StringValue(e.TargetFile)) | ||||
| 		return actual, nil | ||||
| 
 | ||||
| 		klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", e.TargetFile) | ||||
| 	} | ||||
| 
 | ||||
| 	klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", fi.StringValue(e.TargetFile)) | ||||
| 	return nil, nil | ||||
| 
 | ||||
| } | ||||
| 
 | ||||
| // Run is the default run method.
 | ||||
| func (e *CopyFile) Run(c *fi.Context) error { | ||||
| 	return fi.DefaultDeltaRunMethod(e, c) | ||||
| } | ||||
| 
 | ||||
| func (s *CopyFile) CheckChanges(a, e, changes *CopyFile) error { | ||||
| 	if fi.StringValue(e.Name) == "" { | ||||
| 		return fi.RequiredField("Name") | ||||
| 	} | ||||
| 	if fi.StringValue(e.SourceFile) == "" { | ||||
| 		return fi.RequiredField("SourceFile") | ||||
| 	} | ||||
| 	if fi.StringValue(e.TargetFile) == "" { | ||||
| 		return fi.RequiredField("TargetFile") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (_ *CopyFile) Render(c *fi.Context, a, e, changes *CopyFile) error { | ||||
| 
 | ||||
| 	source := fi.StringValue(e.SourceFile) | ||||
| 	target := fi.StringValue(e.TargetFile) | ||||
| 	sourceSha := fi.StringValue(e.SHA) | ||||
| 	source := e.SourceFile | ||||
| 	target := e.TargetFile | ||||
| 	sourceSha := e.SHA | ||||
| 
 | ||||
| 	klog.V(2).Infof("copying bits from %q to %q", source, target) | ||||
| 
 | ||||
| 	if err := transferFile(c, source, target, sourceSha); err != nil { | ||||
| 	if err := transferFile(e.Cluster, source, target, sourceSha); err != nil { | ||||
| 		return fmt.Errorf("unable to transfer %q to %q: %v", source, target, err) | ||||
| 	} | ||||
| 
 | ||||
|  | @ -137,7 +97,7 @@ func (_ *CopyFile) Render(c *fi.Context, a, e, changes *CopyFile) error { | |||
| 
 | ||||
| // transferFile downloads a file from the source location, validates the file matches the SHA,
 | ||||
| // and uploads the file to the target location.
 | ||||
| func transferFile(c *fi.Context, source string, target string, sha string) error { | ||||
| func transferFile(cluster *kops.Cluster, source string, target string, sha string) error { | ||||
| 
 | ||||
| 	// TODO drop file to disk, as vfs reads file into memory.  We load kubelet into memory for instance.
 | ||||
| 	// TODO in s3 can we do a copy file ... would need to test
 | ||||
|  | @ -188,21 +148,21 @@ func transferFile(c *fi.Context, source string, target string, sha string) error | |||
| 	} | ||||
| 
 | ||||
| 	klog.Infof("uploading %q to %q", source, objectStore) | ||||
| 	if err := writeFile(c, uploadVFS, data); err != nil { | ||||
| 	if err := writeFile(cluster, uploadVFS, data); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	b := []byte(shaHash.Hex()) | ||||
| 	if err := writeFile(c, shaVFS, b); err != nil { | ||||
| 	if err := writeFile(cluster, shaVFS, b); err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func writeFile(c *fi.Context, p vfs.Path, data []byte) error { | ||||
| func writeFile(cluster *kops.Cluster, p vfs.Path, data []byte) error { | ||||
| 
 | ||||
| 	acl, err := acls.GetACL(p, c.Cluster) | ||||
| 	acl, err := acls.GetACL(p, cluster) | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
|  | @ -14,7 +14,7 @@ See the License for the specific language governing permissions and | |||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assettasks | ||||
| package assets | ||||
| 
 | ||||
| import ( | ||||
| 	"testing" | ||||
|  | @ -0,0 +1,73 @@ | |||
| /* | ||||
| Copyright 2017 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assets | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	"k8s.io/klog/v2" | ||||
| ) | ||||
| 
 | ||||
| // CopyImage copies a docker image from a source registry, to a target registry,
 | ||||
| // typically used for highly secure clusters.
 | ||||
| type CopyImage struct { | ||||
| 	Name        string | ||||
| 	SourceImage string | ||||
| 	TargetImage string | ||||
| } | ||||
| 
 | ||||
| func (e *CopyImage) Run() error { | ||||
| 	api, err := newDockerAPI() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	cli, err := newDockerCLI() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	source := e.SourceImage | ||||
| 	target := e.TargetImage | ||||
| 
 | ||||
| 	klog.Infof("copying docker image from %q to %q", source, target) | ||||
| 
 | ||||
| 	err = cli.pullImage(source) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error pulling image %q: %v", source, err) | ||||
| 	} | ||||
| 	sourceImage, err := api.findImage(source) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error finding image %q: %v", source, err) | ||||
| 	} | ||||
| 	if sourceImage == nil { | ||||
| 		return fmt.Errorf("source image %q not found", source) | ||||
| 	} | ||||
| 
 | ||||
| 	err = api.tagImage(sourceImage.ID, target) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error tagging image %q: %v", source, err) | ||||
| 	} | ||||
| 
 | ||||
| 	err = cli.pushImage(target) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error pushing image %q: %v", target, err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
|  | @ -14,7 +14,7 @@ See the License for the specific language governing permissions and | |||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assettasks | ||||
| package assets | ||||
| 
 | ||||
| import ( | ||||
| 	"context" | ||||
|  | @ -14,7 +14,7 @@ See the License for the specific language governing permissions and | |||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assettasks | ||||
| package assets | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
|  | @ -1,31 +0,0 @@ | |||
| load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") | ||||
| 
 | ||||
| go_library( | ||||
|     name = "go_default_library", | ||||
|     srcs = [ | ||||
|         "copyfile.go", | ||||
|         "copyfile_fitask.go", | ||||
|         "copyimage.go", | ||||
|         "copyimage_fitask.go", | ||||
|         "docker_api.go", | ||||
|         "docker_cli.go", | ||||
|     ], | ||||
|     importpath = "k8s.io/kops/upup/pkg/fi/assettasks", | ||||
|     visibility = ["//visibility:public"], | ||||
|     deps = [ | ||||
|         "//pkg/acls:go_default_library", | ||||
|         "//upup/pkg/fi:go_default_library", | ||||
|         "//util/pkg/hashing:go_default_library", | ||||
|         "//util/pkg/vfs:go_default_library", | ||||
|         "//vendor/github.com/docker/docker/api/types:go_default_library", | ||||
|         "//vendor/github.com/docker/docker/api/types/filters:go_default_library", | ||||
|         "//vendor/github.com/docker/docker/client:go_default_library", | ||||
|         "//vendor/k8s.io/klog/v2:go_default_library", | ||||
|     ], | ||||
| ) | ||||
| 
 | ||||
| go_test( | ||||
|     name = "go_default_test", | ||||
|     srcs = ["copyfile_test.go"], | ||||
|     embed = [":go_default_library"], | ||||
| ) | ||||
|  | @ -1,51 +0,0 @@ | |||
| // +build !ignore_autogenerated
 | ||||
| 
 | ||||
| /* | ||||
| Copyright The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| // Code generated by fitask. DO NOT EDIT.
 | ||||
| 
 | ||||
| package assettasks | ||||
| 
 | ||||
| import ( | ||||
| 	"k8s.io/kops/upup/pkg/fi" | ||||
| ) | ||||
| 
 | ||||
| // CopyFile
 | ||||
| 
 | ||||
| var _ fi.HasLifecycle = &CopyFile{} | ||||
| 
 | ||||
| // GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle
 | ||||
| func (o *CopyFile) GetLifecycle() fi.Lifecycle { | ||||
| 	return o.Lifecycle | ||||
| } | ||||
| 
 | ||||
| // SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle
 | ||||
| func (o *CopyFile) SetLifecycle(lifecycle fi.Lifecycle) { | ||||
| 	o.Lifecycle = lifecycle | ||||
| } | ||||
| 
 | ||||
| var _ fi.HasName = &CopyFile{} | ||||
| 
 | ||||
| // GetName returns the Name of the object, implementing fi.HasName
 | ||||
| func (o *CopyFile) GetName() *string { | ||||
| 	return o.Name | ||||
| } | ||||
| 
 | ||||
| // String is the stringer function for the task, producing readable output using fi.TaskAsString
 | ||||
| func (o *CopyFile) String() string { | ||||
| 	return fi.TaskAsString(o) | ||||
| } | ||||
|  | @ -1,155 +0,0 @@ | |||
| /* | ||||
| Copyright 2017 The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| package assettasks | ||||
| 
 | ||||
| import ( | ||||
| 	"fmt" | ||||
| 
 | ||||
| 	"k8s.io/klog/v2" | ||||
| 	"k8s.io/kops/upup/pkg/fi" | ||||
| ) | ||||
| 
 | ||||
| // CopyImage copies a docker image from a source registry, to a target registry,
 | ||||
| // typically used for highly secure clusters.
 | ||||
| // +kops:fitask
 | ||||
| type CopyImage struct { | ||||
| 	Name        *string | ||||
| 	SourceImage *string | ||||
| 	TargetImage *string | ||||
| 	Lifecycle   fi.Lifecycle | ||||
| } | ||||
| 
 | ||||
| var _ fi.CompareWithID = &CopyImage{} | ||||
| 
 | ||||
| func (e *CopyImage) CompareWithID() *string { | ||||
| 	return e.Name | ||||
| } | ||||
| 
 | ||||
| func (e *CopyImage) Find(c *fi.Context) (*CopyImage, error) { | ||||
| 	return nil, nil | ||||
| 
 | ||||
| 	// The problem here is that we can tag a local image with the remote tag, but there is no way to know
 | ||||
| 	// if that has actually been pushed to the remote registry without doing a docker push
 | ||||
| 
 | ||||
| 	// The solution is probably to query the registries directly, but that is a little bit more code...
 | ||||
| 
 | ||||
| 	// For now, we just always do the copy; it isn't _too_ slow when things have already been pushed
 | ||||
| 
 | ||||
| 	//d, err := newDocker()
 | ||||
| 	//if err != nil {
 | ||||
| 	//	return nil, err
 | ||||
| 	//}
 | ||||
| 	//
 | ||||
| 	//source := fi.StringValue(e.SourceImage)
 | ||||
| 	//target := fi.StringValue(e.TargetImage)
 | ||||
| 	//
 | ||||
| 	//targetImage, err := d.findImage(target)
 | ||||
| 	//if err != nil {
 | ||||
| 	//	return nil, err
 | ||||
| 	//}
 | ||||
| 	//if targetImage == nil {
 | ||||
| 	//	klog.V(4).Infof("target image %q not found", target)
 | ||||
| 	//	return nil, nil
 | ||||
| 	//}
 | ||||
| 	//
 | ||||
| 	//// We want to verify that the target image matches
 | ||||
| 	//if err := d.pullImage(source); err != nil {
 | ||||
| 	//	return nil, err
 | ||||
| 	//}
 | ||||
| 	//
 | ||||
| 	//sourceImage, err := d.findImage(source)
 | ||||
| 	//if err != nil {
 | ||||
| 	//	return nil, err
 | ||||
| 	//}
 | ||||
| 	//if sourceImage == nil {
 | ||||
| 	//	return nil, fmt.Errorf("source image %q not found", source)
 | ||||
| 	//}
 | ||||
| 	//
 | ||||
| 	//if sourceImage.ID == targetImage.ID {
 | ||||
| 	//	actual := &CopyImage{}
 | ||||
| 	//	actual.Name = e.Name
 | ||||
| 	//	actual.SourceImage = e.SourceImage
 | ||||
| 	//	actual.TargetImage = e.TargetImage
 | ||||
| 	//	klog.Infof("Found image %q = %s", target, sourceImage.ID)
 | ||||
| 	//	return actual, nil
 | ||||
| 	//}
 | ||||
| 	//
 | ||||
| 	//klog.V(2).Infof("Target image %q does not match source %q: %q vs %q",
 | ||||
| 	//	target, source,
 | ||||
| 	//	targetImage.ID, sourceImage.ID)
 | ||||
| 	//
 | ||||
| 	//return nil, nil
 | ||||
| } | ||||
| 
 | ||||
| func (e *CopyImage) Run(c *fi.Context) error { | ||||
| 	return fi.DefaultDeltaRunMethod(e, c) | ||||
| } | ||||
| 
 | ||||
| func (s *CopyImage) CheckChanges(a, e, changes *CopyImage) error { | ||||
| 	if fi.StringValue(e.Name) == "" { | ||||
| 		return fi.RequiredField("Name") | ||||
| 	} | ||||
| 	if fi.StringValue(e.SourceImage) == "" { | ||||
| 		return fi.RequiredField("SourceImage") | ||||
| 	} | ||||
| 	if fi.StringValue(e.TargetImage) == "" { | ||||
| 		return fi.RequiredField("TargetImage") | ||||
| 	} | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (_ *CopyImage) Render(c *fi.Context, a, e, changes *CopyImage) error { | ||||
| 	api, err := newDockerAPI() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	cli, err := newDockerCLI() | ||||
| 	if err != nil { | ||||
| 		return err | ||||
| 
 | ||||
| 	} | ||||
| 
 | ||||
| 	source := fi.StringValue(e.SourceImage) | ||||
| 	target := fi.StringValue(e.TargetImage) | ||||
| 
 | ||||
| 	klog.Infof("copying docker image from %q to %q", source, target) | ||||
| 
 | ||||
| 	err = cli.pullImage(source) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error pulling image %q: %v", source, err) | ||||
| 	} | ||||
| 	sourceImage, err := api.findImage(source) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error finding image %q: %v", source, err) | ||||
| 	} | ||||
| 	if sourceImage == nil { | ||||
| 		return fmt.Errorf("source image %q not found", source) | ||||
| 	} | ||||
| 
 | ||||
| 	err = api.tagImage(sourceImage.ID, target) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error tagging image %q: %v", source, err) | ||||
| 	} | ||||
| 
 | ||||
| 	err = cli.pushImage(target) | ||||
| 	if err != nil { | ||||
| 		return fmt.Errorf("error pushing image %q: %v", target, err) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
|  | @ -1,51 +0,0 @@ | |||
| // +build !ignore_autogenerated
 | ||||
| 
 | ||||
| /* | ||||
| Copyright The Kubernetes Authors. | ||||
| 
 | ||||
| Licensed under the Apache License, Version 2.0 (the "License"); | ||||
| you may not use this file except in compliance with the License. | ||||
| You may obtain a copy of the License at | ||||
| 
 | ||||
|     http://www.apache.org/licenses/LICENSE-2.0
 | ||||
| 
 | ||||
| Unless required by applicable law or agreed to in writing, software | ||||
| distributed under the License is distributed on an "AS IS" BASIS, | ||||
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
| See the License for the specific language governing permissions and | ||||
| limitations under the License. | ||||
| */ | ||||
| 
 | ||||
| // Code generated by fitask. DO NOT EDIT.
 | ||||
| 
 | ||||
| package assettasks | ||||
| 
 | ||||
| import ( | ||||
| 	"k8s.io/kops/upup/pkg/fi" | ||||
| ) | ||||
| 
 | ||||
| // CopyImage
 | ||||
| 
 | ||||
| var _ fi.HasLifecycle = &CopyImage{} | ||||
| 
 | ||||
| // GetLifecycle returns the Lifecycle of the object, implementing fi.HasLifecycle
 | ||||
| func (o *CopyImage) GetLifecycle() fi.Lifecycle { | ||||
| 	return o.Lifecycle | ||||
| } | ||||
| 
 | ||||
| // SetLifecycle sets the Lifecycle of the object, implementing fi.SetLifecycle
 | ||||
| func (o *CopyImage) SetLifecycle(lifecycle fi.Lifecycle) { | ||||
| 	o.Lifecycle = lifecycle | ||||
| } | ||||
| 
 | ||||
| var _ fi.HasName = &CopyImage{} | ||||
| 
 | ||||
| // GetName returns the Name of the object, implementing fi.HasName
 | ||||
| func (o *CopyImage) GetName() *string { | ||||
| 	return o.Name | ||||
| } | ||||
| 
 | ||||
| // String is the stringer function for the task, producing readable output using fi.TaskAsString
 | ||||
| func (o *CopyImage) String() string { | ||||
| 	return fi.TaskAsString(o) | ||||
| } | ||||
		Loading…
	
		Reference in New Issue