Merge pull request #3016 from justinsb/upload_taskify_2

Automatic merge from submit-queue

Copy docker images when they are redirected
This commit is contained in:
Kubernetes Submit Queue 2017-07-21 12:10:47 -07:00 committed by GitHub
commit ac887ce290
10 changed files with 468 additions and 7 deletions

View File

@ -136,6 +136,7 @@ codegen: kops-gobindata
go install k8s.io/kops/upup/tools/generators/...
PATH=${GOPATH_1ST}/bin:${PATH} go generate k8s.io/kops/upup/pkg/fi/cloudup/awstasks
PATH=${GOPATH_1ST}/bin:${PATH} go generate k8s.io/kops/upup/pkg/fi/cloudup/gcetasks
PATH=${GOPATH_1ST}/bin:${PATH} go generate k8s.io/kops/upup/pkg/fi/dockertasks
PATH=${GOPATH_1ST}/bin:${PATH} go generate k8s.io/kops/upup/pkg/fi/fitasks
.PHONY: protobuf

View File

@ -105,6 +105,7 @@ k8s.io/kops/upup/pkg/fi/cloudup/gcetasks
k8s.io/kops/upup/pkg/fi/cloudup/terraform
k8s.io/kops/upup/pkg/fi/cloudup/vsphere
k8s.io/kops/upup/pkg/fi/cloudup/vspheretasks
k8s.io/kops/upup/pkg/fi/dockertasks
k8s.io/kops/upup/pkg/fi/fitasks
k8s.io/kops/upup/pkg/fi/k8sapi
k8s.io/kops/upup/pkg/fi/loader

View File

@ -36,14 +36,21 @@ type AssetBuilder struct {
}
type Asset struct {
Origin string
Mirror string
// DockerImage will be the name of the docker image we should run, if this is a docker image
DockerImage string
// CanonicalLocation will be the source location of the image, if we should copy it to the actual location
CanonicalLocation string
}
func NewAssetBuilder() *AssetBuilder {
return &AssetBuilder{}
}
// RemapManifest transforms a kubernetes manifest.
// Whenever we are building a Task that includes a manifest, we should pass it through RemapManifest first.
// This will:
// * rewrite the images if they are being redirected to a mirror, and ensure the image is uploaded
func (a *AssetBuilder) RemapManifest(data []byte) ([]byte, error) {
if !RewriteManifests.Enabled() {
return data, nil
@ -74,7 +81,7 @@ func (a *AssetBuilder) RemapManifest(data []byte) ([]byte, error) {
func (a *AssetBuilder) remapImage(image string) (string, error) {
asset := &Asset{}
asset.Origin = image
asset.DockerImage = image
if strings.HasPrefix(image, "kope/dns-controller:") {
// To use user-defined DNS Controller:
@ -87,7 +94,24 @@ func (a *AssetBuilder) remapImage(image string) (string, error) {
}
}
asset.Mirror = image
registryMirror := os.Getenv("DEV_KOPS_REGISTRY_MIRROR")
registryMirror = strings.TrimSuffix(registryMirror, "/")
if registryMirror != "" {
normalized := image
// Remove the 'standard' kubernetes image prefix, just for sanity
normalized = strings.TrimPrefix(normalized, "gcr.io/google_containers/")
// We can't nest arbitrarily
// Some risk of collisions, but also -- and __ in the names appear to be blocked by docker hub
normalized = strings.Replace(normalized, "/", "-", -1)
asset.DockerImage = registryMirror + "/" + normalized
asset.CanonicalLocation = image
// Run the new image
image = asset.DockerImage
}
a.Assets = append(a.Assets, asset)

View File

@ -700,7 +700,7 @@ func (c *ApplyClusterCmd) Run() error {
tf.AddTo(l.TemplateFunctions)
taskMap, err := l.BuildTasks(modelStore, fileModels)
taskMap, err := l.BuildTasks(modelStore, fileModels, assetBuilder)
if err != nil {
return fmt.Errorf("error building tasks: %v", err)
}

View File

@ -24,7 +24,9 @@ import (
"io"
"k8s.io/apimachinery/pkg/util/sets"
api "k8s.io/kops/pkg/apis/kops"
"k8s.io/kops/pkg/assets"
"k8s.io/kops/upup/pkg/fi"
"k8s.io/kops/upup/pkg/fi/dockertasks"
"k8s.io/kops/upup/pkg/fi/loader"
"k8s.io/kops/upup/pkg/fi/utils"
"k8s.io/kops/util/pkg/vfs"
@ -146,7 +148,7 @@ func ignoreHandler(i *loader.TreeWalkItem) error {
return nil
}
func (l *Loader) BuildTasks(modelStore vfs.Path, models []string) (map[string]fi.Task, error) {
func (l *Loader) BuildTasks(modelStore vfs.Path, models []string, assetBuilder *assets.AssetBuilder) (map[string]fi.Task, error) {
// Second pass: load everything else
tw := &loader.TreeWalker{
DefaultHandler: l.objectHandler,
@ -178,6 +180,10 @@ func (l *Loader) BuildTasks(modelStore vfs.Path, models []string) (map[string]fi
l.tasks = context.Tasks
}
if err := l.addAssetCopyTasks(assetBuilder.Assets); err != nil {
return nil, err
}
err := l.processDeferrals()
if err != nil {
return nil, err
@ -185,6 +191,29 @@ func (l *Loader) BuildTasks(modelStore vfs.Path, models []string) (map[string]fi
return l.tasks, nil
}
func (l *Loader) addAssetCopyTasks(assets []*assets.Asset) error {
for _, asset := range assets {
if asset.CanonicalLocation != "" && asset.DockerImage != asset.CanonicalLocation {
context := &fi.ModelBuilderContext{
Tasks: l.tasks,
}
copyImageTask := &dockertasks.CopyDockerImage{
Name: fi.String(asset.DockerImage),
SourceImage: fi.String(asset.CanonicalLocation),
TargetImage: fi.String(asset.DockerImage),
}
if err := context.EnsureTask(copyImageTask); err != nil {
return fmt.Errorf("error adding asset-copy task: %v", err)
}
l.tasks = context.Tasks
}
}
return nil
}
func (l *Loader) processDeferrals() error {
for taskKey, task := range l.tasks {
taskValue := reflect.ValueOf(task)

View File

@ -0,0 +1,153 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertasks
import (
"fmt"
"github.com/golang/glog"
"k8s.io/kops/upup/pkg/fi"
)
// CopyDockerImage copies a docker image from a source registry, to a target registry,
// typically used for highly secure clusters.
//go:generate fitask -type=CopyDockerImage
type CopyDockerImage struct {
Name *string
SourceImage *string
TargetImage *string
}
var _ fi.CompareWithID = &CopyDockerImage{}
func (e *CopyDockerImage) CompareWithID() *string {
return e.Name
}
func (e *CopyDockerImage) Find(c *fi.Context) (*CopyDockerImage, error) {
return nil, nil
// The problem here is that we can tag a local image with the remote tag, but there is no way to know
// if that has actually been pushed to the remote registry without doing a docker push
// The solution is probably to query the registries directly, but that is a little bit more code...
// For now, we just always do the copy; it isn't _too_ slow when things have already been pushed
//d, err := newDocker()
//if err != nil {
// return nil, err
//}
//
//source := fi.StringValue(e.SourceImage)
//target := fi.StringValue(e.TargetImage)
//
//targetImage, err := d.findImage(target)
//if err != nil {
// return nil, err
//}
//if targetImage == nil {
// glog.V(4).Infof("target image %q not found", target)
// return nil, nil
//}
//
//// We want to verify that the target image matches
//if err := d.pullImage(source); err != nil {
// return nil, err
//}
//
//sourceImage, err := d.findImage(source)
//if err != nil {
// return nil, err
//}
//if sourceImage == nil {
// return nil, fmt.Errorf("source image %q not found", source)
//}
//
//if sourceImage.ID == targetImage.ID {
// actual := &CopyDockerImage{}
// actual.Name = e.Name
// actual.SourceImage = e.SourceImage
// actual.TargetImage = e.TargetImage
// glog.Infof("Found image %q = %s", target, sourceImage.ID)
// return actual, nil
//}
//
//glog.V(2).Infof("Target image %q does not match source %q: %q vs %q",
// target, source,
// targetImage.ID, sourceImage.ID)
//
//return nil, nil
}
func (e *CopyDockerImage) Run(c *fi.Context) error {
return fi.DefaultDeltaRunMethod(e, c)
}
func (s *CopyDockerImage) CheckChanges(a, e, changes *CopyDockerImage) error {
if fi.StringValue(e.Name) == "" {
return fi.RequiredField("Name")
}
if fi.StringValue(e.SourceImage) == "" {
return fi.RequiredField("SourceImage")
}
if fi.StringValue(e.TargetImage) == "" {
return fi.RequiredField("TargetImage")
}
return nil
}
func (_ *CopyDockerImage) Render(c *fi.Context, a, e, changes *CopyDockerImage) error {
api, err := newDockerAPI()
if err != nil {
return err
}
cli, err := newDockerCLI()
if err != nil {
return err
}
source := fi.StringValue(e.SourceImage)
target := fi.StringValue(e.TargetImage)
glog.Infof("copying docker image from %q to %q", source, target)
err = cli.pullImage(source)
if err != nil {
return fmt.Errorf("error pulling image %q: %v", source, err)
}
sourceImage, err := api.findImage(source)
if err != nil {
return err
}
if sourceImage == nil {
return fmt.Errorf("source image %q not found", source)
}
err = api.tagImage(sourceImage.ID, target)
if err != nil {
return fmt.Errorf("error tagging image %q: %v", source, err)
}
err = cli.pushImage(target)
if err != nil {
return fmt.Errorf("error pushing image %q: %v", target, err)
}
return nil
}

View File

@ -0,0 +1,63 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Code generated by ""fitask" -type=CopyDockerImage"; DO NOT EDIT
package dockertasks
import (
"encoding/json"
"k8s.io/kops/upup/pkg/fi"
)
// CopyDockerImage
// JSON marshalling boilerplate
type realCopyDockerImage CopyDockerImage
// UnmarshalJSON implements conversion to JSON, supporitng an alternate specification of the object as a string
func (o *CopyDockerImage) UnmarshalJSON(data []byte) error {
var jsonName string
if err := json.Unmarshal(data, &jsonName); err == nil {
o.Name = &jsonName
return nil
}
var r realCopyDockerImage
if err := json.Unmarshal(data, &r); err != nil {
return err
}
*o = CopyDockerImage(r)
return nil
}
var _ fi.HasName = &CopyDockerImage{}
// GetName returns the Name of the object, implementing fi.HasName
func (o *CopyDockerImage) GetName() *string {
return o.Name
}
// SetName sets the Name of the object, implementing fi.SetName
func (o *CopyDockerImage) SetName(name string) {
o.Name = &name
}
// String is the stringer function for the task, producing readable output using fi.TaskAsString
func (o *CopyDockerImage) String() string {
return fi.TaskAsString(o)
}

View File

@ -0,0 +1,132 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertasks
import (
"bufio"
"fmt"
"github.com/docker/engine-api/client"
"github.com/docker/engine-api/types"
"github.com/golang/glog"
"golang.org/x/net/context"
)
// dockerAPI encapsulates access to docker via the API
type dockerAPI struct {
client *client.Client
}
// newDockerAPI builds a dockerAPI object, for talking to docker via the API
func newDockerAPI() (*dockerAPI, error) {
c, err := client.NewEnvClient()
if err != nil {
return nil, fmt.Errorf("error building docker client: %v", err)
}
return &dockerAPI{
client: c,
}, nil
}
// findImage does a `docker images` via the API, and finds the specified image
func (d *dockerAPI) findImage(name string) (*types.Image, error) {
glog.V(4).Infof("docker query for image %q", name)
options := types.ImageListOptions{
MatchName: name,
}
ctx := context.Background()
images, err := d.client.ImageList(ctx, options)
if err != nil {
return nil, fmt.Errorf("error listing images: %v", err)
}
for i := range images {
for _, repoTag := range images[i].RepoTags {
if repoTag == name {
return &images[i], nil
}
}
}
return nil, nil
}
// pullImage does `docker pull`, via the API.
// Because it is non-trivial to get credentials, we tend to use the CLI
func (d *dockerAPI) pullImage(name string) error {
glog.V(4).Infof("docker pull for image %q", name)
ctx := context.Background()
pullOptions := types.ImagePullOptions{}
resp, err := d.client.ImagePull(ctx, name, pullOptions)
if resp != nil {
defer resp.Close()
}
if err != nil {
return fmt.Errorf("error pulling image %q: %v", name, err)
}
scanner := bufio.NewScanner(resp)
for scanner.Scan() {
// {"status":"Already exists","progressDetail":{},"id":"a3ed95caeb02"}
// {"status":"Status: Image is up to date for gcr.io/google_containers/cluster-proportional-autoscaler-amd64:1.0.0"}
glog.Infof("docker pull %s", scanner.Text())
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error pulling image %q: %v", name, err)
}
return nil
}
// pushImage does `docker push`, via the API.
// Because it is non-trivial to get credentials, we tend to use the CLI
func (d *dockerAPI) pushImage(name string) error {
glog.V(4).Infof("docker push for image %q", name)
ctx := context.Background()
options := types.ImagePushOptions{}
resp, err := d.client.ImagePush(ctx, name, options)
if resp != nil {
defer resp.Close()
}
if err != nil {
return fmt.Errorf("error pushing image %q: %v", name, err)
}
scanner := bufio.NewScanner(resp)
for scanner.Scan() {
glog.Infof("docker pushing %s", scanner.Text())
}
if err := scanner.Err(); err != nil {
return fmt.Errorf("error pushing image %q: %v", name, err)
}
return nil
}
// tagImage does a `docker tag`, via the API
func (d *dockerAPI) tagImage(imageID string, ref string) error {
glog.V(4).Infof("docker tag for image %q, tag %q", imageID, ref)
ctx := context.Background()
options := types.ImageTagOptions{}
err := d.client.ImageTag(ctx, imageID, ref, options)
if err != nil {
return fmt.Errorf("error tagging image %q with tag %q: %v", imageID, ref, err)
}
return nil
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dockertasks
import (
"fmt"
"github.com/golang/glog"
"os/exec"
)
// dockerCLI encapsulates access to docker via the CLI
type dockerCLI struct {
}
// newDockerCLI builds a dockerCLI object, for talking to docker via the CLI
func newDockerCLI() (*dockerCLI, error) {
return &dockerCLI{}, nil
}
// pullImage does a `docker pull`, shelling out to the CLI
func (d *dockerCLI) pullImage(name string) error {
glog.V(4).Infof("docker pull for image %q", name)
cmd := exec.Command("docker", "pull", name)
err := cmd.Run()
if err != nil {
return fmt.Errorf("error pulling image %q: %v", name, err)
}
return nil
}
// pushImage does a docker push, shelling out to the CLI
func (d *dockerCLI) pushImage(name string) error {
glog.V(4).Infof("docker push for image %q", name)
cmd := exec.Command("docker", "push", name)
err := cmd.Run()
if err != nil {
return fmt.Errorf("error pushing image %q: %v", name, err)
}
return nil
}

View File

@ -250,7 +250,7 @@ func (t *DryRunTarget) PrintReport(taskMap map[string]Task, out io.Writer) error
if len(t.assetBuilder.Assets) != 0 {
glog.V(4).Infof("Assets:")
for _, a := range t.assetBuilder.Assets {
glog.V(4).Infof(" %s %s", a.Origin, a.Mirror)
glog.V(4).Infof(" %s %s", a.DockerImage, a.CanonicalLocation)
}
}