stream load and output nothing if load successfully

Signed-off-by: Xian Chaobo <xianchaobo@huawei.com>
This commit is contained in:
Xian Chaobo 2015-04-14 19:05:21 +08:00
parent b4dd3e32e4
commit 6ad2a7847c
4 changed files with 42 additions and 41 deletions

View File

@ -4,11 +4,8 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"os"
"path"
"runtime"
"sort"
"strconv"
@ -275,25 +272,6 @@ func postImagesCreate(c *context, w http.ResponseWriter, r *http.Request) {
// POST /images/load
func postImagesLoad(c *context, w http.ResponseWriter, r *http.Request) {
//cache tar file
tmpImageDir, err := ioutil.TempDir("", "docker-import-")
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
defer os.RemoveAll(tmpImageDir)
repoTarFile := path.Join(tmpImageDir, "repo.tar")
tarFile, err := os.Create(repoTarFile)
if err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
if _, err := io.Copy(tarFile, r.Body); err != nil {
httpError(w, err.Error(), http.StatusInternalServerError)
return
}
tarFile.Close()
// call cluster to load image on every node
wf := NewWriteFlusher(w)
@ -304,7 +282,7 @@ func postImagesLoad(c *context, w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(wf, "%s:Loading Image... %s\n", what, status)
}
}
c.cluster.Load(repoTarFile, callback)
c.cluster.Load(r.Body, callback)
}
// GET /events

View File

@ -2,6 +2,7 @@ package cluster
import (
"github.com/samalba/dockerclient"
"io"
)
// Cluster is exported
@ -37,7 +38,7 @@ type Cluster interface {
// `callback` can be called multiple time
// `what` is what is being loaded
// `status` is the current status, like "", "in progress" or "loaded"
Load(tarFile string, callback func(what, status string))
Load(imageReader io.Reader, callback func(what, status string))
// Return some info about the cluster, like nb or containers / images
// It is pretty open, so the implementation decides what to return.

View File

@ -4,8 +4,8 @@ import (
"crypto/tls"
"errors"
"fmt"
"io"
"net"
"os"
"strings"
"sync"
"time"
@ -413,13 +413,8 @@ func (e *Engine) Pull(image string) error {
}
// Load an image on the engine
func (e *Engine) Load(tarFile string) error {
file, err := os.Open(tarFile)
if err != nil {
return err
}
if err := e.client.LoadImage(file); err != nil {
func (e *Engine) Load(reader io.Reader) error {
if err := e.client.LoadImage(reader); err != nil {
return err
}
return nil

View File

@ -3,6 +3,7 @@ package swarm
import (
"errors"
"fmt"
"io"
"sort"
"sync"
@ -239,25 +240,51 @@ func (c *Cluster) Pull(name string, callback func(what, status string)) {
}
// Load image
func (c *Cluster) Load(tarFile string, callback func(what, status string)) {
func (c *Cluster) Load(imageReader io.Reader, callback func(what, status string)) {
size := len(c.engines)
done := make(chan bool, size)
pipeWriters := []*io.PipeWriter{}
pipeReaders := []*io.PipeReader{}
for _, n := range c.engines {
go func(nn *cluster.Engine) {
if callback != nil {
callback(nn.Name, "")
}
err := nn.Load(tarFile)
pipeReader, pipeWriter := io.Pipe()
pipeReaders = append(pipeReaders, pipeReader)
pipeWriters = append(pipeWriters, pipeWriter)
go func(reader *io.PipeReader, nn *cluster.Engine) {
// call engine load image
err := nn.Load(reader)
if callback != nil {
if err != nil {
callback(nn.Name, err.Error())
} else {
callback(nn.Name, "loaded")
}
}
// clean up
defer reader.Close()
done <- true
}(n)
}(pipeReader, n)
}
// create multi-writer
listWriter := []io.Writer{}
for _, pipeW := range pipeWriters {
listWriter = append(listWriter, pipeW)
}
mutiWriter := io.MultiWriter(listWriter...)
// copy image-reader to muti-writer
_, err := io.Copy(mutiWriter, imageReader)
if err != nil {
log.Error(err)
}
// close pipe writers
for _, pipeW := range pipeWriters {
pipeW.Close()
}
// wait all host done
for i := 0; i < size; i++ {
<-done
}