From 1f20358b5d83bb057004cbcdd560ec1db613a750 Mon Sep 17 00:00:00 2001 From: Xian Chaobo Date: Tue, 12 May 2015 07:51:14 -0400 Subject: [PATCH 1/6] add support import Signed-off-by: Xian Chaobo --- api/handlers.go | 13 +++++++--- cluster/cluster.go | 6 +++++ cluster/engine.go | 12 +++++++++ cluster/swarm/cluster.go | 54 ++++++++++++++++++++++++++++++++++++++++ 4 files changed, 82 insertions(+), 3 deletions(-) diff --git a/api/handlers.go b/api/handlers.go index 45af459690..22a8e67b87 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -378,6 +378,8 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { } wf := NewWriteFlusher(w) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) if image := r.Form.Get("fromImage"); image != "" { //pull authConfig := dockerclient.AuthConfig{} @@ -385,8 +387,6 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { if err == nil { json.Unmarshal(buf, &authConfig) } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusCreated) if tag := r.Form.Get("tag"); tag != "" { image += ":" + tag @@ -400,7 +400,14 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { } c.cluster.Pull(image, &authConfig, callback) } else { //import - httpError(w, "Not supported in clustering mode.", http.StatusNotImplemented) + source := r.Form.Get("fromSrc") + repo := r.Form.Get("repo") + tag := r.Form.Get("tag") + + callback := func(what, status string) { + fmt.Fprintf(wf, "{%q:%q,%q:\"%s\"}", "id", what, "status", status) + } + c.cluster.Import(source, repo, tag, r.Body, callback) } } diff --git a/cluster/cluster.go b/cluster/cluster.go index d6f838a1fc..609f2a4fb0 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -37,6 +37,12 @@ type Cluster interface { // `status` is the current status, like "", "in progress" or "downloaded Pull(name string, authConfig *dockerclient.AuthConfig, callback func(what, status string)) + // Import image + // `callback` can be called multiple time + // `what` is what is being imported + // `status` is the current status, like "", "in progress" or "imported" + Import(source string, repository string, tar string, imageReader io.Reader, callback func(what, status string)) + // Load images // `callback` can be called multiple time // `what` is what is being loaded diff --git a/cluster/engine.go b/cluster/engine.go index 6f9608a22f..38fb875c28 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -443,6 +443,18 @@ func (e *Engine) Load(reader io.Reader) error { return nil } +// Import image +func (e *Engine) Import(source string, repository string, tar string, imageReader io.Reader) error { + if _, err := e.client.ImportImage(source, repository, tar, imageReader); err != nil { + return err + } + + // force fresh images + e.RefreshImages() + + return nil +} + // RegisterEventHandler registers an event handler. func (e *Engine) RegisterEventHandler(h EventHandler) error { if e.eventHandler != nil { diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 6d8201240b..e94db6ea6d 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -377,6 +377,60 @@ func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string) wg.Wait() } +// Import image +func (c *Cluster) Import(source string, repository string, tar string, imageReader io.Reader, callback func(what, status string)) { + var wg sync.WaitGroup + c.RLock() + pipeWriters := []*io.PipeWriter{} + pipeReaders := []*io.PipeReader{} + for _, n := range c.engines { + wg.Add(1) + + pipeReader, pipeWriter := io.Pipe() + pipeReaders = append(pipeReaders, pipeReader) + pipeWriters = append(pipeWriters, pipeWriter) + + go func(reader *io.PipeReader, nn *cluster.Engine) { + defer wg.Done() + defer reader.Close() + + // call engine import + err := nn.Import(source, repository, tar, reader) + if callback != nil { + if err != nil { + callback(nn.Name, err.Error()) + } else { + callback(nn.Name, "Import success") + } + } + + }(pipeReader, n) + } + + // create multi-writer + listWriter := []io.Writer{} + for _, pipeW := range pipeWriters { + listWriter = append(listWriter, pipeW) + } + mutiWriter := io.MultiWriter(listWriter...) + + // copy image-reader to muti-writer + _, err := io.Copy(mutiWriter, imageReader) + if err != nil { + log.Error(err) + } + + // close pipe writers + for _, pipeW := range pipeWriters { + pipeW.Close() + } + + c.RUnlock() + + wg.Wait() + +} + // Containers returns all the containers in the cluster. func (c *Cluster) Containers() cluster.Containers { c.RLock() From f63bafc7c5801f2889045c80047d20108c13dbe7 Mon Sep 17 00:00:00 2001 From: Xian Chaobo Date: Tue, 12 May 2015 07:51:43 -0400 Subject: [PATCH 2/6] add import integration test Signed-off-by: Xian Chaobo --- test/integration/api/import.bats | 34 ++++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/test/integration/api/import.bats b/test/integration/api/import.bats index 8981c55a9a..48e7508a07 100644 --- a/test/integration/api/import.bats +++ b/test/integration/api/import.bats @@ -7,7 +7,37 @@ function teardown() { stop_docker } -# FIXME @test "docker import" { - skip + start_docker_with_busybox 2 + swarm_manage + # run a container to export + docker_swarm run -d --name test_container busybox sleep 500 + + temp_file_name=$(mktemp) + # make sure container exists + run docker_swarm ps -l + [ "${#lines[@]}" -eq 2 ] + [[ "${lines[1]}" == *"test_container"* ]] + + # export, container->tar + docker_swarm export test_container > $temp_file_name + + # verify: exported file exists, not empty and is tar file + [ -s $temp_file_name ] + run file $temp_file_name + [ "$status" -eq 0 ] + [[ "$output" == *"tar archive"* ]] + + # import + docker_swarm import - testbusybox < $temp_file_name + + # verify on the nodes + for host in ${HOSTS[@]}; do + run docker -H $host images + [ "$status" -eq 0 ] + [[ "${output}" == *"testbusybox"* ]] + done + + # after ok, delete exported tar file + rm -f $temp_file_name } From 3bd2e5b5841dbaa7819a13784ac2673b72422db9 Mon Sep 17 00:00:00 2001 From: Xian Chaobo Date: Wed, 13 May 2015 21:52:53 -0400 Subject: [PATCH 3/6] update README.md Signed-off-by: Xian Chaobo --- api/README.md | 8 -------- 1 file changed, 8 deletions(-) diff --git a/api/README.md b/api/README.md index 05b2045863..12b649c2fe 100644 --- a/api/README.md +++ b/api/README.md @@ -8,14 +8,6 @@ page_keywords: docker, swarm, clustering, api The Docker Swarm API is mostly compatible with the [Docker Remote API](https://docs.docker.com/reference/api/docker_remote_api/). This document is an overview of the differences between the Swarm API and the Docker Remote API. -## Missing endpoints - -Some endpoints have not yet been implemented and will return a 404 error. - -``` -POST "/images/create" : "docker import" flow not implement -``` - ## Endpoints which behave differently * `GET "/containers/{name:.*}/json"`: New field `Node` added: From 8da3de3cc476bb867358b3b2261b6f85f25c1543 Mon Sep 17 00:00:00 2001 From: Xian Date: Tue, 19 May 2015 12:56:17 +0000 Subject: [PATCH 4/6] add import unit test Signed-off-by: Xian --- cluster/cluster.go | 2 +- cluster/engine.go | 9 ++-- cluster/engine_test.go | 12 +++--- cluster/swarm/cluster.go | 4 +- cluster/swarm/cluster_test.go | 78 ++++++++++++++++++++++++++++++++++- 5 files changed, 90 insertions(+), 15 deletions(-) diff --git a/cluster/cluster.go b/cluster/cluster.go index 609f2a4fb0..bab478055d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -41,7 +41,7 @@ type Cluster interface { // `callback` can be called multiple time // `what` is what is being imported // `status` is the current status, like "", "in progress" or "imported" - Import(source string, repository string, tar string, imageReader io.Reader, callback func(what, status string)) + Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) // Load images // `callback` can be called multiple time diff --git a/cluster/engine.go b/cluster/engine.go index 38fb875c28..45632bbc8c 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -75,10 +75,11 @@ func (e *Engine) Connect(config *tls.Config) error { return err } - return e.connectClient(c) + return e.ConnectWithClient(c) } -func (e *Engine) connectClient(client dockerclient.Client) error { +// ConnectWithClient is exported +func (e *Engine) ConnectWithClient(client dockerclient.Client) error { e.client = client // Fetch the engine labels. @@ -444,8 +445,8 @@ func (e *Engine) Load(reader io.Reader) error { } // Import image -func (e *Engine) Import(source string, repository string, tar string, imageReader io.Reader) error { - if _, err := e.client.ImportImage(source, repository, tar, imageReader); err != nil { +func (e *Engine) Import(source string, repository string, tag string, imageReader io.Reader) error { + if _, err := e.client.ImportImage(source, repository, tag, imageReader); err != nil { return err } diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 1e68f22ff8..cbda0d007b 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -34,7 +34,7 @@ func TestEngineConnectionFailure(t *testing.T) { client.On("Info").Return(&dockerclient.Info{}, errors.New("fail")) // Connect() should fail and isConnected() return false. - assert.Error(t, engine.connectClient(client)) + assert.Error(t, engine.ConnectWithClient(client)) assert.False(t, engine.isConnected()) client.Mock.AssertExpectations(t) @@ -45,7 +45,7 @@ func TestOutdatedEngine(t *testing.T) { client := mockclient.NewMockClient() client.On("Info").Return(&dockerclient.Info{}, nil) - assert.Error(t, engine.connectClient(client)) + assert.Error(t, engine.ConnectWithClient(client)) assert.False(t, engine.isConnected()) client.Mock.AssertExpectations(t) @@ -61,7 +61,7 @@ func TestEngineCpusMemory(t *testing.T) { client.On("ListImages").Return([]*dockerclient.Image{}, nil) client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) assert.True(t, engine.IsHealthy()) @@ -81,7 +81,7 @@ func TestEngineSpecs(t *testing.T) { client.On("ListImages").Return([]*dockerclient.Image{}, nil) client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) assert.True(t, engine.IsHealthy()) @@ -111,7 +111,7 @@ func TestEngineState(t *testing.T) { client.On("ListContainers", true, false, fmt.Sprintf("{%q:[%q]}", "id", "two")).Return([]dockerclient.Container{{Id: "two"}}, nil).Once() client.On("InspectContainer", "two").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) // The engine should only have a single container at this point. @@ -151,7 +151,7 @@ func TestCreateContainer(t *testing.T) { client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once() client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) mockConfig := config.ContainerConfig diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index e94db6ea6d..f407d9e20a 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -378,7 +378,7 @@ func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string) } // Import image -func (c *Cluster) Import(source string, repository string, tar string, imageReader io.Reader, callback func(what, status string)) { +func (c *Cluster) Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) { var wg sync.WaitGroup c.RLock() pipeWriters := []*io.PipeWriter{} @@ -395,7 +395,7 @@ func (c *Cluster) Import(source string, repository string, tar string, imageRead defer reader.Close() // call engine import - err := nn.Import(source, repository, tar, reader) + err := nn.Import(source, repository, tag, reader) if callback != nil { if err != nil { callback(nn.Name, err.Error()) diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index b870908836..5efa80458e 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -1,11 +1,38 @@ package swarm import ( - "testing" - + "bytes" + "fmt" "github.com/docker/swarm/cluster" "github.com/samalba/dockerclient" + "github.com/samalba/dockerclient/mockclient" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "io" + "testing" +) + +type nopCloser struct { + io.Reader +} + +// Close +func (nopCloser) Close() error { + return nil +} + +var ( + mockInfo = &dockerclient.Info{ + ID: "id", + Name: "name", + NCPU: 10, + MemTotal: 20, + Driver: "driver-test", + ExecutionDriver: "execution-driver-test", + KernelVersion: "1.2.3", + OperatingSystem: "golang", + Labels: []string{"foo=bar"}, + } ) func createEngine(t *testing.T, ID string, containers ...*cluster.Container) *cluster.Engine { @@ -76,3 +103,50 @@ func TestContainerLookup(t *testing.T) { assert.NotNil(t, cc) assert.Equal(t, cc.Id, "container2-id") } + +func TestImportImage(t *testing.T) { + // create cluster + c := &Cluster{ + engines: make(map[string]*cluster.Engine), + } + + // create engione + id := "test-engine" + engine := cluster.NewEngine(id, 0) + engine.Name = id + engine.ID = id + + // create mock client + client := mockclient.NewMockClient() + client.On("Info").Return(mockInfo, nil) + client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once() + client.On("ListImages").Return([]*dockerclient.Image{}, nil) + + // connect client + engine.ConnectWithClient(client) + + // add engine to cluster + c.engines[engine.ID] = engine + + // import success + readCloser := nopCloser{bytes.NewBufferString("ok")} + client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(readCloser, nil).Once() + + callback := func(what, status string) { + // import success + assert.Equal(t, status, "Import success") + } + c.Import("-", "testImageOK", "latest", bytes.NewReader(nil), callback) + + // import error + readCloser = nopCloser{bytes.NewBufferString("error")} + err := fmt.Errorf("Import error") + client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(readCloser, err).Once() + + callback = func(what, status string) { + // import error + assert.Equal(t, status, "Import error") + } + c.Import("-", "testImageError", "latest", bytes.NewReader(nil), callback) +} From d7001cac978c522a5b886f84cb554b2dee15ae7d Mon Sep 17 00:00:00 2001 From: Xian Chaobo Date: Wed, 27 May 2015 22:06:15 +0800 Subject: [PATCH 5/6] fix import unit test Signed-off-by: Xian Chaobo --- cluster/swarm/cluster.go | 11 ++++------- cluster/swarm/cluster_test.go | 4 ++-- 2 files changed, 6 insertions(+), 9 deletions(-) diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index f407d9e20a..32e0ed7946 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -382,12 +382,11 @@ func (c *Cluster) Import(source string, repository string, tag string, imageRead var wg sync.WaitGroup c.RLock() pipeWriters := []*io.PipeWriter{} - pipeReaders := []*io.PipeReader{} + for _, n := range c.engines { wg.Add(1) pipeReader, pipeWriter := io.Pipe() - pipeReaders = append(pipeReaders, pipeReader) pipeWriters = append(pipeWriters, pipeWriter) go func(reader *io.PipeReader, nn *cluster.Engine) { @@ -406,16 +405,17 @@ func (c *Cluster) Import(source string, repository string, tag string, imageRead }(pipeReader, n) } + c.RUnlock() // create multi-writer listWriter := []io.Writer{} for _, pipeW := range pipeWriters { listWriter = append(listWriter, pipeW) } - mutiWriter := io.MultiWriter(listWriter...) + multiWriter := io.MultiWriter(listWriter...) // copy image-reader to muti-writer - _, err := io.Copy(mutiWriter, imageReader) + _, err := io.Copy(multiWriter, imageReader) if err != nil { log.Error(err) } @@ -425,10 +425,7 @@ func (c *Cluster) Import(source string, repository string, tag string, imageRead pipeW.Close() } - c.RUnlock() - wg.Wait() - } // Containers returns all the containers in the cluster. diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index 5efa80458e..aaab11a471 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -131,7 +131,7 @@ func TestImportImage(t *testing.T) { // import success readCloser := nopCloser{bytes.NewBufferString("ok")} - client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(readCloser, nil).Once() + client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.AnythingOfType("*io.PipeReader")).Return(readCloser, nil).Once() callback := func(what, status string) { // import success @@ -142,7 +142,7 @@ func TestImportImage(t *testing.T) { // import error readCloser = nopCloser{bytes.NewBufferString("error")} err := fmt.Errorf("Import error") - client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(readCloser, err).Once() + client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.AnythingOfType("*io.PipeReader")).Return(readCloser, err).Once() callback = func(what, status string) { // import error From 684b2802d70824d1e64173af7a3e5b8499af66f8 Mon Sep 17 00:00:00 2001 From: Xian Chaobo Date: Wed, 27 May 2015 22:12:56 +0800 Subject: [PATCH 6/6] add import in mesos Signed-off-by: Xian Chaobo --- cluster/mesos/cluster.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go index fb4f713048..2a0a9d0da5 100644 --- a/cluster/mesos/cluster.go +++ b/cluster/mesos/cluster.go @@ -245,6 +245,11 @@ func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string) } +// Import image +func (c *Cluster) Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) { + +} + // RenameContainer Rename a container func (c *Cluster) RenameContainer(container *cluster.Container, newName string) error { //FIXME this doesn't work as the next refreshcontainer will erase this change (this change is in-memory only)