Remove copy tasks dependency on pkg/fi

This commit is contained in:
John Gardiner Myers 2021-06-06 23:05:13 -07:00
parent c08479186e
commit 4b05805f9d
4 changed files with 51 additions and 68 deletions

View File

@ -15,7 +15,6 @@ go_library(
"//pkg/acls:go_default_library",
"//pkg/apis/kops:go_default_library",
"//pkg/assets:go_default_library",
"//upup/pkg/fi:go_default_library",
"//util/pkg/hashing:go_default_library",
"//util/pkg/vfs:go_default_library",
"//vendor/github.com/docker/docker/api/types:go_default_library",

View File

@ -23,76 +23,60 @@ import (
"k8s.io/klog/v2"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/assets"
"k8s.io/kops/upup/pkg/fi"
)
type copyAssetsTarget struct {
}
func (c copyAssetsTarget) Finish(taskMap map[string]fi.Task) error {
return nil
}
func (c copyAssetsTarget) ProcessDeletions() bool {
return false
type assetTask interface {
Run() error
}
func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, cluster *kops.Cluster) error {
tasks := map[string]fi.Task{}
tasks := map[string]assetTask{}
for _, imageAsset := range imageAssets {
if imageAsset.DownloadLocation != imageAsset.CanonicalLocation {
copyImageTask := &CopyImage{
Name: fi.String(imageAsset.DownloadLocation),
SourceImage: fi.String(imageAsset.CanonicalLocation),
TargetImage: fi.String(imageAsset.DownloadLocation),
Name: imageAsset.DownloadLocation,
SourceImage: imageAsset.CanonicalLocation,
TargetImage: imageAsset.DownloadLocation,
}
if existing, ok := tasks[*copyImageTask.Name]; ok {
if *existing.(*CopyImage).SourceImage != *copyImageTask.SourceImage {
return fmt.Errorf("different sources for same image target %s: %s vs %s", *copyImageTask.Name, *copyImageTask.SourceImage, *existing.(*CopyImage).SourceImage)
if existing, ok := tasks[copyImageTask.Name]; ok {
if existing.(*CopyImage).SourceImage != copyImageTask.SourceImage {
return fmt.Errorf("different sources for same image target %s: %s vs %s", copyImageTask.Name, copyImageTask.SourceImage, existing.(*CopyImage).SourceImage)
}
}
tasks[*copyImageTask.Name] = copyImageTask
tasks[copyImageTask.Name] = copyImageTask
}
}
for _, fileAsset := range fileAssets {
if fileAsset.DownloadURL.String() != fileAsset.CanonicalURL.String() {
copyFileTask := &CopyFile{
Name: fi.String(fileAsset.CanonicalURL.String()),
TargetFile: fi.String(fileAsset.DownloadURL.String()),
SourceFile: fi.String(fileAsset.CanonicalURL.String()),
SHA: fi.String(fileAsset.SHAValue),
Name: fileAsset.CanonicalURL.String(),
TargetFile: fileAsset.DownloadURL.String(),
SourceFile: fileAsset.CanonicalURL.String(),
SHA: fileAsset.SHAValue,
Cluster: cluster,
}
if existing, ok := tasks[*copyFileTask.Name]; ok {
if existing, ok := tasks[copyFileTask.Name]; ok {
e, ok := existing.(*CopyFile)
if !ok {
return fmt.Errorf("different types for copy target %s", *copyFileTask.Name)
return fmt.Errorf("different types for copy target %s", copyFileTask.Name)
}
if *e.TargetFile != *copyFileTask.TargetFile {
return fmt.Errorf("different targets for same file %s: %s vs %s", *copyFileTask.Name, *copyFileTask.TargetFile, *e.TargetFile)
if e.TargetFile != copyFileTask.TargetFile {
return fmt.Errorf("different targets for same file %s: %s vs %s", copyFileTask.Name, copyFileTask.TargetFile, e.TargetFile)
}
if *e.SHA != *copyFileTask.SHA {
return fmt.Errorf("different sha for same file %s: %s vs %s", *copyFileTask.Name, *copyFileTask.SHA, *e.SHA)
if e.SHA != copyFileTask.SHA {
return fmt.Errorf("different sha for same file %s: %s vs %s", copyFileTask.Name, copyFileTask.SHA, e.SHA)
}
}
tasks[*copyFileTask.Name] = copyFileTask
tasks[copyFileTask.Name] = copyFileTask
}
}
var options fi.RunTasksOptions
options.InitDefaults()
context, err := fi.NewContext(&copyAssetsTarget{}, cluster, nil, nil, nil, nil, true, tasks)
if err != nil {
return fmt.Errorf("error building context: %v", err)
}
defer context.Close()
ch := make(chan error, 5)
for i := 0; i < cap(ch); i++ {
ch <- nil
@ -111,8 +95,8 @@ func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, clus
klog.Warning(err)
gotError = true
}
go func(n string, t fi.Task) {
err := t.Run(context)
go func(n string, t assetTask) {
err := t.Run()
if err != nil {
err = fmt.Errorf("%s: %v", n, err)
}
@ -121,7 +105,7 @@ func Copy(imageAssets []*assets.ImageAsset, fileAssets []*assets.FileAsset, clus
}
for i := 0; i < cap(ch); i++ {
err = <-ch
err := <-ch
if err != nil {
klog.Warning(err)
gotError = true

View File

@ -25,7 +25,7 @@ import (
"k8s.io/klog/v2"
"k8s.io/kops/pkg/acls"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/util/pkg/hashing"
"k8s.io/kops/util/pkg/vfs"
)
@ -33,10 +33,11 @@ import (
// CopyFile copies an from a source file repository, to a target repository,
// typically used for highly secure clusters.
type CopyFile struct {
Name *string
SourceFile *string
TargetFile *string
SHA *string
Name string
SourceFile string
TargetFile string
SHA string
Cluster *kops.Cluster
}
// fileExtensionForSHA returns the expected extension for the given hash
@ -52,15 +53,15 @@ func fileExtensionForSHA(sha string) (string, error) {
}
}
func (e *CopyFile) Run(c *fi.Context) error {
expectedSHA := strings.TrimSpace(fi.StringValue(e.SHA))
func (e *CopyFile) Run() error {
expectedSHA := strings.TrimSpace(e.SHA)
shaExtension, err := fileExtensionForSHA(expectedSHA)
if err != nil {
return err
}
targetSHAFile := fi.StringValue(e.TargetFile) + shaExtension
targetSHAFile := e.TargetFile + shaExtension
targetSHABytes, err := vfs.Context.ReadFile(targetSHAFile)
if err != nil {
@ -74,20 +75,20 @@ func (e *CopyFile) Run(c *fi.Context) error {
targetSHA := string(targetSHABytes)
if strings.TrimSpace(targetSHA) == expectedSHA {
klog.V(8).Infof("found matching target sha for file: %q", fi.StringValue(e.TargetFile))
klog.V(8).Infof("found matching target sha for file: %q", e.TargetFile)
return nil
}
klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", fi.StringValue(e.TargetFile))
klog.V(8).Infof("did not find same file, found mismatching target sha1 for file: %q", e.TargetFile)
}
source := fi.StringValue(e.SourceFile)
target := fi.StringValue(e.TargetFile)
sourceSha := fi.StringValue(e.SHA)
source := e.SourceFile
target := e.TargetFile
sourceSha := e.SHA
klog.V(2).Infof("copying bits from %q to %q", source, target)
if err := transferFile(c, source, target, sourceSha); err != nil {
if err := transferFile(e.Cluster, source, target, sourceSha); err != nil {
return fmt.Errorf("unable to transfer %q to %q: %v", source, target, err)
}
@ -96,7 +97,7 @@ func (e *CopyFile) Run(c *fi.Context) error {
// transferFile downloads a file from the source location, validates the file matches the SHA,
// and uploads the file to the target location.
func transferFile(c *fi.Context, source string, target string, sha string) error {
func transferFile(cluster *kops.Cluster, source string, target string, sha string) error {
// TODO drop file to disk, as vfs reads file into memory. We load kubelet into memory for instance.
// TODO in s3 can we do a copy file ... would need to test
@ -147,21 +148,21 @@ func transferFile(c *fi.Context, source string, target string, sha string) error
}
klog.Infof("uploading %q to %q", source, objectStore)
if err := writeFile(c, uploadVFS, data); err != nil {
if err := writeFile(cluster, uploadVFS, data); err != nil {
return err
}
b := []byte(shaHash.Hex())
if err := writeFile(c, shaVFS, b); err != nil {
if err := writeFile(cluster, shaVFS, b); err != nil {
return err
}
return nil
}
func writeFile(c *fi.Context, p vfs.Path, data []byte) error {
func writeFile(cluster *kops.Cluster, p vfs.Path, data []byte) error {
acl, err := acls.GetACL(p, c.Cluster)
acl, err := acls.GetACL(p, cluster)
if err != nil {
return err
}

View File

@ -20,18 +20,17 @@ import (
"fmt"
"k8s.io/klog/v2"
"k8s.io/kops/upup/pkg/fi"
)
// CopyImage copies a docker image from a source registry, to a target registry,
// typically used for highly secure clusters.
type CopyImage struct {
Name *string
SourceImage *string
TargetImage *string
Name string
SourceImage string
TargetImage string
}
func (e *CopyImage) Run(c *fi.Context) error {
func (e *CopyImage) Run() error {
api, err := newDockerAPI()
if err != nil {
return err
@ -43,8 +42,8 @@ func (e *CopyImage) Run(c *fi.Context) error {
}
source := fi.StringValue(e.SourceImage)
target := fi.StringValue(e.TargetImage)
source := e.SourceImage
target := e.TargetImage
klog.Infof("copying docker image from %q to %q", source, target)