diff --git a/api/README.md b/api/README.md index 05b2045863..12b649c2fe 100644 --- a/api/README.md +++ b/api/README.md @@ -8,14 +8,6 @@ page_keywords: docker, swarm, clustering, api The Docker Swarm API is mostly compatible with the [Docker Remote API](https://docs.docker.com/reference/api/docker_remote_api/). This document is an overview of the differences between the Swarm API and the Docker Remote API. -## Missing endpoints - -Some endpoints have not yet been implemented and will return a 404 error. - -``` -POST "/images/create" : "docker import" flow not implement -``` - ## Endpoints which behave differently * `GET "/containers/{name:.*}/json"`: New field `Node` added: diff --git a/api/handlers.go b/api/handlers.go index 45af459690..22a8e67b87 100644 --- a/api/handlers.go +++ b/api/handlers.go @@ -378,6 +378,8 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { } wf := NewWriteFlusher(w) + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusCreated) if image := r.Form.Get("fromImage"); image != "" { //pull authConfig := dockerclient.AuthConfig{} @@ -385,8 +387,6 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { if err == nil { json.Unmarshal(buf, &authConfig) } - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(http.StatusCreated) if tag := r.Form.Get("tag"); tag != "" { image += ":" + tag @@ -400,7 +400,14 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) { } c.cluster.Pull(image, &authConfig, callback) } else { //import - httpError(w, "Not supported in clustering mode.", http.StatusNotImplemented) + source := r.Form.Get("fromSrc") + repo := r.Form.Get("repo") + tag := r.Form.Get("tag") + + callback := func(what, status string) { + fmt.Fprintf(wf, "{%q:%q,%q:\"%s\"}", "id", what, "status", status) + } + c.cluster.Import(source, repo, tag, r.Body, callback) } } diff --git a/cluster/cluster.go b/cluster/cluster.go index d6f838a1fc..bab478055d 100644 --- a/cluster/cluster.go +++ b/cluster/cluster.go @@ -37,6 +37,12 @@ type Cluster interface { // `status` is the current status, like "", "in progress" or "downloaded Pull(name string, authConfig *dockerclient.AuthConfig, callback func(what, status string)) + // Import image + // `callback` can be called multiple time + // `what` is what is being imported + // `status` is the current status, like "", "in progress" or "imported" + Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) + // Load images // `callback` can be called multiple time // `what` is what is being loaded diff --git a/cluster/engine.go b/cluster/engine.go index 6f9608a22f..45632bbc8c 100644 --- a/cluster/engine.go +++ b/cluster/engine.go @@ -75,10 +75,11 @@ func (e *Engine) Connect(config *tls.Config) error { return err } - return e.connectClient(c) + return e.ConnectWithClient(c) } -func (e *Engine) connectClient(client dockerclient.Client) error { +// ConnectWithClient is exported +func (e *Engine) ConnectWithClient(client dockerclient.Client) error { e.client = client // Fetch the engine labels. @@ -443,6 +444,18 @@ func (e *Engine) Load(reader io.Reader) error { return nil } +// Import image +func (e *Engine) Import(source string, repository string, tag string, imageReader io.Reader) error { + if _, err := e.client.ImportImage(source, repository, tag, imageReader); err != nil { + return err + } + + // force fresh images + e.RefreshImages() + + return nil +} + // RegisterEventHandler registers an event handler. func (e *Engine) RegisterEventHandler(h EventHandler) error { if e.eventHandler != nil { diff --git a/cluster/engine_test.go b/cluster/engine_test.go index 1e68f22ff8..cbda0d007b 100644 --- a/cluster/engine_test.go +++ b/cluster/engine_test.go @@ -34,7 +34,7 @@ func TestEngineConnectionFailure(t *testing.T) { client.On("Info").Return(&dockerclient.Info{}, errors.New("fail")) // Connect() should fail and isConnected() return false. - assert.Error(t, engine.connectClient(client)) + assert.Error(t, engine.ConnectWithClient(client)) assert.False(t, engine.isConnected()) client.Mock.AssertExpectations(t) @@ -45,7 +45,7 @@ func TestOutdatedEngine(t *testing.T) { client := mockclient.NewMockClient() client.On("Info").Return(&dockerclient.Info{}, nil) - assert.Error(t, engine.connectClient(client)) + assert.Error(t, engine.ConnectWithClient(client)) assert.False(t, engine.isConnected()) client.Mock.AssertExpectations(t) @@ -61,7 +61,7 @@ func TestEngineCpusMemory(t *testing.T) { client.On("ListImages").Return([]*dockerclient.Image{}, nil) client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) assert.True(t, engine.IsHealthy()) @@ -81,7 +81,7 @@ func TestEngineSpecs(t *testing.T) { client.On("ListImages").Return([]*dockerclient.Image{}, nil) client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) assert.True(t, engine.IsHealthy()) @@ -111,7 +111,7 @@ func TestEngineState(t *testing.T) { client.On("ListContainers", true, false, fmt.Sprintf("{%q:[%q]}", "id", "two")).Return([]dockerclient.Container{{Id: "two"}}, nil).Once() client.On("InspectContainer", "two").Return(&dockerclient.ContainerInfo{Config: &dockerclient.ContainerConfig{CpuShares: 100}}, nil).Once() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) // The engine should only have a single container at this point. @@ -151,7 +151,7 @@ func TestCreateContainer(t *testing.T) { client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once() client.On("ListImages").Return([]*dockerclient.Image{}, nil).Once() - assert.NoError(t, engine.connectClient(client)) + assert.NoError(t, engine.ConnectWithClient(client)) assert.True(t, engine.isConnected()) mockConfig := config.ContainerConfig diff --git a/cluster/mesos/cluster.go b/cluster/mesos/cluster.go index fb4f713048..2a0a9d0da5 100644 --- a/cluster/mesos/cluster.go +++ b/cluster/mesos/cluster.go @@ -245,6 +245,11 @@ func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string) } +// Import image +func (c *Cluster) Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) { + +} + // RenameContainer Rename a container func (c *Cluster) RenameContainer(container *cluster.Container, newName string) error { //FIXME this doesn't work as the next refreshcontainer will erase this change (this change is in-memory only) diff --git a/cluster/swarm/cluster.go b/cluster/swarm/cluster.go index 6d8201240b..32e0ed7946 100644 --- a/cluster/swarm/cluster.go +++ b/cluster/swarm/cluster.go @@ -377,6 +377,57 @@ func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string) wg.Wait() } +// Import image +func (c *Cluster) Import(source string, repository string, tag string, imageReader io.Reader, callback func(what, status string)) { + var wg sync.WaitGroup + c.RLock() + pipeWriters := []*io.PipeWriter{} + + for _, n := range c.engines { + wg.Add(1) + + pipeReader, pipeWriter := io.Pipe() + pipeWriters = append(pipeWriters, pipeWriter) + + go func(reader *io.PipeReader, nn *cluster.Engine) { + defer wg.Done() + defer reader.Close() + + // call engine import + err := nn.Import(source, repository, tag, reader) + if callback != nil { + if err != nil { + callback(nn.Name, err.Error()) + } else { + callback(nn.Name, "Import success") + } + } + + }(pipeReader, n) + } + c.RUnlock() + + // create multi-writer + listWriter := []io.Writer{} + for _, pipeW := range pipeWriters { + listWriter = append(listWriter, pipeW) + } + multiWriter := io.MultiWriter(listWriter...) + + // copy image-reader to muti-writer + _, err := io.Copy(multiWriter, imageReader) + if err != nil { + log.Error(err) + } + + // close pipe writers + for _, pipeW := range pipeWriters { + pipeW.Close() + } + + wg.Wait() +} + // Containers returns all the containers in the cluster. func (c *Cluster) Containers() cluster.Containers { c.RLock() diff --git a/cluster/swarm/cluster_test.go b/cluster/swarm/cluster_test.go index b870908836..aaab11a471 100644 --- a/cluster/swarm/cluster_test.go +++ b/cluster/swarm/cluster_test.go @@ -1,11 +1,38 @@ package swarm import ( - "testing" - + "bytes" + "fmt" "github.com/docker/swarm/cluster" "github.com/samalba/dockerclient" + "github.com/samalba/dockerclient/mockclient" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "io" + "testing" +) + +type nopCloser struct { + io.Reader +} + +// Close +func (nopCloser) Close() error { + return nil +} + +var ( + mockInfo = &dockerclient.Info{ + ID: "id", + Name: "name", + NCPU: 10, + MemTotal: 20, + Driver: "driver-test", + ExecutionDriver: "execution-driver-test", + KernelVersion: "1.2.3", + OperatingSystem: "golang", + Labels: []string{"foo=bar"}, + } ) func createEngine(t *testing.T, ID string, containers ...*cluster.Container) *cluster.Engine { @@ -76,3 +103,50 @@ func TestContainerLookup(t *testing.T) { assert.NotNil(t, cc) assert.Equal(t, cc.Id, "container2-id") } + +func TestImportImage(t *testing.T) { + // create cluster + c := &Cluster{ + engines: make(map[string]*cluster.Engine), + } + + // create engione + id := "test-engine" + engine := cluster.NewEngine(id, 0) + engine.Name = id + engine.ID = id + + // create mock client + client := mockclient.NewMockClient() + client.On("Info").Return(mockInfo, nil) + client.On("StartMonitorEvents", mock.Anything, mock.Anything, mock.Anything).Return() + client.On("ListContainers", true, false, "").Return([]dockerclient.Container{}, nil).Once() + client.On("ListImages").Return([]*dockerclient.Image{}, nil) + + // connect client + engine.ConnectWithClient(client) + + // add engine to cluster + c.engines[engine.ID] = engine + + // import success + readCloser := nopCloser{bytes.NewBufferString("ok")} + client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.AnythingOfType("*io.PipeReader")).Return(readCloser, nil).Once() + + callback := func(what, status string) { + // import success + assert.Equal(t, status, "Import success") + } + c.Import("-", "testImageOK", "latest", bytes.NewReader(nil), callback) + + // import error + readCloser = nopCloser{bytes.NewBufferString("error")} + err := fmt.Errorf("Import error") + client.On("ImportImage", mock.Anything, mock.Anything, mock.Anything, mock.AnythingOfType("*io.PipeReader")).Return(readCloser, err).Once() + + callback = func(what, status string) { + // import error + assert.Equal(t, status, "Import error") + } + c.Import("-", "testImageError", "latest", bytes.NewReader(nil), callback) +} diff --git a/test/integration/api/import.bats b/test/integration/api/import.bats index 8981c55a9a..48e7508a07 100644 --- a/test/integration/api/import.bats +++ b/test/integration/api/import.bats @@ -7,7 +7,37 @@ function teardown() { stop_docker } -# FIXME @test "docker import" { - skip + start_docker_with_busybox 2 + swarm_manage + # run a container to export + docker_swarm run -d --name test_container busybox sleep 500 + + temp_file_name=$(mktemp) + # make sure container exists + run docker_swarm ps -l + [ "${#lines[@]}" -eq 2 ] + [[ "${lines[1]}" == *"test_container"* ]] + + # export, container->tar + docker_swarm export test_container > $temp_file_name + + # verify: exported file exists, not empty and is tar file + [ -s $temp_file_name ] + run file $temp_file_name + [ "$status" -eq 0 ] + [[ "$output" == *"tar archive"* ]] + + # import + docker_swarm import - testbusybox < $temp_file_name + + # verify on the nodes + for host in ${HOSTS[@]}; do + run docker -H $host images + [ "$status" -eq 0 ] + [[ "${output}" == *"testbusybox"* ]] + done + + # after ok, delete exported tar file + rm -f $temp_file_name }