From a27b4b8cb8e838d03a99b6d2b30f76bdaf2f9e5d Mon Sep 17 00:00:00 2001 From: Andrea Luzzardi Date: Fri, 18 Jan 2013 16:13:39 -0800 Subject: [PATCH] Initial commit --- container.go | 203 +++++++++++++++++++++++++++++++++++++++++++++ container_test.go | 186 +++++++++++++++++++++++++++++++++++++++++ docker.go | 112 +++++++++++++++++++++++++ docker_test.go | 175 ++++++++++++++++++++++++++++++++++++++ filesystem.go | 52 ++++++++++++ filesystem_test.go | 35 ++++++++ lxc_template.go | 94 +++++++++++++++++++++ state.go | 48 +++++++++++ utils.go | 115 +++++++++++++++++++++++++ utils_test.go | 126 ++++++++++++++++++++++++++++ 10 files changed, 1146 insertions(+) create mode 100644 container.go create mode 100644 container_test.go create mode 100644 docker.go create mode 100644 docker_test.go create mode 100644 filesystem.go create mode 100644 filesystem_test.go create mode 100644 lxc_template.go create mode 100644 state.go create mode 100644 utils.go create mode 100644 utils_test.go diff --git a/container.go b/container.go new file mode 100644 index 000000000..77fd7c0fe --- /dev/null +++ b/container.go @@ -0,0 +1,203 @@ +package docker + +import ( + "encoding/json" + "errors" + "io" + "io/ioutil" + "os" + "os/exec" + "path" + "syscall" +) + +type Container struct { + Name string + Root string + Path string + Args []string + + *Config + *Filesystem + *State + + lxcConfigPath string + cmd *exec.Cmd + stdout *writeBroadcaster + stderr *writeBroadcaster +} + +type Config struct { + Hostname string + Ram int64 +} + +func createContainer(name string, root string, command string, args []string, layers []string, config *Config) (*Container, error) { + container := &Container{ + Name: name, + Root: root, + Path: command, + Args: args, + Config: config, + Filesystem: newFilesystem(path.Join(root, "rootfs"), path.Join(root, "rw"), layers), + State: newState(), + + lxcConfigPath: path.Join(root, "config.lxc"), + stdout: newWriteBroadcaster(), + stderr: newWriteBroadcaster(), + } + + if err := os.Mkdir(root, 0700); err != nil { + return nil, err + } + + if err := container.save(); err != nil { + return nil, err + } + if err := container.generateLXCConfig(); err != nil { + return nil, err + } + return container, nil +} + +func loadContainer(containerPath string) (*Container, error) { + configPath := path.Join(containerPath, "config.json") + fi, err := os.Open(configPath) + if err != nil { + return nil, err + } + defer fi.Close() + enc := json.NewDecoder(fi) + container := &Container{} + if err := enc.Decode(container); err != nil { + return nil, err + } + return container, nil +} + +func (container *Container) save() error { + configPath := path.Join(container.Root, "config.json") + fo, err := os.Create(configPath) + if err != nil { + return err + } + defer fo.Close() + enc := json.NewEncoder(fo) + if err := enc.Encode(container); err != nil { + return err + } + return nil +} + +func (container *Container) generateLXCConfig() error { + fo, err := os.Create(container.lxcConfigPath) + if err != nil { + return err + } + defer fo.Close() + + if err := LxcTemplateCompiled.Execute(fo, container); err != nil { + return err + } + return nil +} + +func (container *Container) Start() error { + if err := container.Filesystem.Mount(); err != nil { + return err + } + + params := []string{ + "-n", container.Name, + "-f", container.lxcConfigPath, + "--", + container.Path, + } + params = append(params, container.Args...) + + container.cmd = exec.Command("/usr/bin/lxc-start", params...) + container.cmd.Stdout = container.stdout + container.cmd.Stderr = container.stderr + + if err := container.cmd.Start(); err != nil { + return err + } + container.State.setRunning(container.cmd.Process.Pid) + go container.monitor() + + // Wait until we are out of the STARTING state before returning + // + // Even though lxc-wait blocks until the container reaches a given state, + // sometimes it returns an error code, which is why we have to retry. + // + // This is a rare race condition that happens for short lived programs + for retries := 0; retries < 3; retries++ { + err := exec.Command("/usr/bin/lxc-wait", "-n", container.Name, "-s", "RUNNING|STOPPED").Run() + if err == nil { + return nil + } + } + return errors.New("Container failed to start") +} + +func (container *Container) Run() error { + if err := container.Start(); err != nil { + return err + } + container.Wait() + return nil +} + +func (container *Container) Output() (output []byte, err error) { + pipe, err := container.StdoutPipe() + if err != nil { + return nil, err + } + defer pipe.Close() + if err := container.Start(); err != nil { + return nil, err + } + output, err = ioutil.ReadAll(pipe) + container.Wait() + return output, err +} + +func (container *Container) StdoutPipe() (io.ReadCloser, error) { + reader, writer := io.Pipe() + container.stdout.AddWriter(writer) + return newBufReader(reader), nil +} + +func (container *Container) StderrPipe() (io.ReadCloser, error) { + reader, writer := io.Pipe() + container.stderr.AddWriter(writer) + return newBufReader(reader), nil +} + +func (container *Container) monitor() { + container.cmd.Wait() + container.stdout.Close() + container.stderr.Close() + container.State.setStopped(container.cmd.ProcessState.Sys().(syscall.WaitStatus).ExitStatus()) +} + +func (container *Container) Stop() error { + if container.State.Running { + if err := exec.Command("/usr/bin/lxc-stop", "-n", container.Name).Run(); err != nil { + return err + } + //FIXME: We should lxc-wait for the container to stop + } + + if err := container.Filesystem.Umount(); err != nil { + // FIXME: Do not abort, probably already umounted? + return nil + } + return nil +} + +func (container *Container) Wait() { + for container.State.Running { + container.State.wait() + } +} diff --git a/container_test.go b/container_test.go new file mode 100644 index 000000000..650d2a07f --- /dev/null +++ b/container_test.go @@ -0,0 +1,186 @@ +package docker + +import ( + "testing" +) + +func TestStart(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + container, err := docker.Create( + "start_test", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{ + Ram: 33554432, + }, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container) + + if container.State.Running { + t.Errorf("Container shouldn't be running") + } + if err := container.Start(); err != nil { + t.Fatal(err) + } + container.Wait() + if container.State.Running { + t.Errorf("Container shouldn't be running") + } + // We should be able to call Wait again + container.Wait() + if container.State.Running { + t.Errorf("Container shouldn't be running") + } +} + +func TestRun(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + container, err := docker.Create( + "run_test", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{ + Ram: 33554432, + }, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container) + + if container.State.Running { + t.Errorf("Container shouldn't be running") + } + if err := container.Run(); err != nil { + t.Fatal(err) + } + if container.State.Running { + t.Errorf("Container shouldn't be running") + } +} + +func TestOutput(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + container, err := docker.Create( + "output_test", + "echo", + []string{"-n", "foobar"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container) + + pipe, err := container.StdoutPipe() + defer pipe.Close() + output, err := container.Output() + if err != nil { + t.Fatal(err) + } + if string(output) != "foobar" { + t.Error(string(output)) + } +} + +func TestStop(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + container, err := docker.Create( + "stop_test", + "sleep", + []string{"300"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container) + + if container.State.Running { + t.Errorf("Container shouldn't be running") + } + if err := container.Start(); err != nil { + t.Fatal(err) + } + if !container.State.Running { + t.Errorf("Container should be running") + } + if err := container.Stop(); err != nil { + t.Fatal(err) + } + if container.State.Running { + t.Errorf("Container shouldn't be running") + } + container.Wait() + if container.State.Running { + t.Errorf("Container shouldn't be running") + } + // Try stopping twice + if err := container.Stop(); err != nil { + t.Fatal(err) + } +} + +func TestExitCode(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + + trueContainer, err := docker.Create( + "exit_test_1", + "/bin/true", + []string{""}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(trueContainer) + if err := trueContainer.Run(); err != nil { + t.Fatal(err) + } + + falseContainer, err := docker.Create( + "exit_test_2", + "/bin/false", + []string{""}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(falseContainer) + if err := falseContainer.Run(); err != nil { + t.Fatal(err) + } + + if trueContainer.State.ExitCode != 0 { + t.Errorf("Unexpected exit code %v", trueContainer.State.ExitCode) + } + + if falseContainer.State.ExitCode != 1 { + t.Errorf("Unexpected exit code %v", falseContainer.State.ExitCode) + } +} diff --git a/docker.go b/docker.go new file mode 100644 index 000000000..3bdf69466 --- /dev/null +++ b/docker.go @@ -0,0 +1,112 @@ +package docker + +import ( + "container/list" + "fmt" + "io/ioutil" + "os" + "path" +) + +type Docker struct { + root string + repository string + containers *list.List +} + +func (docker *Docker) List() []*Container { + containers := []*Container{} + for e := docker.containers.Front(); e != nil; e = e.Next() { + containers = append(containers, e.Value.(*Container)) + } + return containers +} + +func (docker *Docker) getContainerElement(name string) *list.Element { + for e := docker.containers.Front(); e != nil; e = e.Next() { + container := e.Value.(*Container) + if container.Name == name { + return e + } + } + return nil +} + +func (docker *Docker) Get(name string) *Container { + e := docker.getContainerElement(name) + if e == nil { + return nil + } + return e.Value.(*Container) +} + +func (docker *Docker) Exists(name string) bool { + return docker.Get(name) != nil +} + +func (docker *Docker) Create(name string, command string, args []string, layers []string, config *Config) (*Container, error) { + if docker.Exists(name) { + return nil, fmt.Errorf("Container %v already exists", name) + } + root := path.Join(docker.repository, name) + container, err := createContainer(name, root, command, args, layers, config) + if err != nil { + return nil, err + } + docker.containers.PushBack(container) + return container, nil +} + +func (docker *Docker) Destroy(container *Container) error { + element := docker.getContainerElement(container.Name) + if element == nil { + return fmt.Errorf("Container %v not found - maybe it was already destroyed?", container.Name) + } + + if err := container.Stop(); err != nil { + return err + } + if err := os.RemoveAll(container.Root); err != nil { + return err + } + + docker.containers.Remove(element) + return nil +} + +func (docker *Docker) restore() error { + dir, err := ioutil.ReadDir(docker.repository) + if err != nil { + return err + } + for _, v := range dir { + container, err := loadContainer(path.Join(docker.repository, v.Name())) + if err != nil { + fmt.Errorf("Failed to load %v: %v", v.Name(), err) + continue + } + docker.containers.PushBack(container) + } + return nil +} + +func New() (*Docker, error) { + return NewFromDirectory("/var/lib/docker") +} + +func NewFromDirectory(root string) (*Docker, error) { + docker := &Docker{ + root: root, + repository: path.Join(root, "containers"), + containers: list.New(), + } + + if err := os.Mkdir(docker.repository, 0700); err != nil && !os.IsExist(err) { + return nil, err + } + + if err := docker.restore(); err != nil { + return nil, err + } + return docker, nil +} diff --git a/docker_test.go b/docker_test.go new file mode 100644 index 000000000..3633ea5a3 --- /dev/null +++ b/docker_test.go @@ -0,0 +1,175 @@ +package docker + +import ( + "io/ioutil" + "os" + "testing" +) + +func newTestDocker() (*Docker, error) { + root, err := ioutil.TempDir("", "docker-test") + if err != nil { + return nil, err + } + docker, err := NewFromDirectory(root) + if err != nil { + return nil, err + } + return docker, nil +} + +func TestCreate(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + + // Make sure we start we 0 containers + if len(docker.List()) != 0 { + t.Errorf("Expected 0 containers, %v found", len(docker.List())) + } + container, err := docker.Create( + "test_create", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + + defer func() { + if err := docker.Destroy(container); err != nil { + t.Error(err) + } + }() + + // Make sure we can find the newly created container with List() + if len(docker.List()) != 1 { + t.Errorf("Expected 1 container, %v found", len(docker.List())) + } + + // Make sure the container List() returns is the right one + if docker.List()[0].Name != "test_create" { + t.Errorf("Unexpected container %v returned by List", docker.List()[0]) + } + + // Make sure we can get the container with Get() + if docker.Get("test_create") == nil { + t.Errorf("Unable to get newly created container") + } + + // Make sure it is the right container + if docker.Get("test_create") != container { + t.Errorf("Get() returned the wrong container") + } + + // Make sure Exists returns it as existing + if !docker.Exists("test_create") { + t.Errorf("Exists() returned false for a newly created container") + } +} + +func TestDestroy(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + container, err := docker.Create( + "test_destroy", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + // Destroy + if err := docker.Destroy(container); err != nil { + t.Error(err) + } + + // Make sure docker.Exists() behaves correctly + if docker.Exists("test_destroy") { + t.Errorf("Exists() returned true") + } + + // Make sure docker.List() doesn't list the destroyed container + if len(docker.List()) != 0 { + t.Errorf("Expected 0 container, %v found", len(docker.List())) + } + + // Make sure docker.Get() refuses to return the unexisting container + if docker.Get("test_destroy") != nil { + t.Errorf("Unable to get newly created container") + } + + // Make sure the container root directory does not exist anymore + _, err = os.Stat(container.Root) + if err == nil || !os.IsNotExist(err) { + t.Errorf("Container root directory still exists after destroy") + } + + // Test double destroy + if err := docker.Destroy(container); err == nil { + // It should have failed + t.Errorf("Double destroy did not fail") + } +} + +func TestGet(t *testing.T) { + docker, err := newTestDocker() + if err != nil { + t.Fatal(err) + } + container1, err := docker.Create( + "test1", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container1) + + container2, err := docker.Create( + "test2", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container2) + + container3, err := docker.Create( + "test3", + "ls", + []string{"-al"}, + []string{"/var/lib/docker/images/ubuntu"}, + &Config{}, + ) + if err != nil { + t.Fatal(err) + } + defer docker.Destroy(container3) + + if docker.Get("test1") != container1 { + t.Errorf("Get(test1) returned %v while expecting %v", docker.Get("test1"), container1) + } + + if docker.Get("test2") != container2 { + t.Errorf("Get(test2) returned %v while expecting %v", docker.Get("test2"), container2) + } + + if docker.Get("test3") != container3 { + t.Errorf("Get(test3) returned %v while expecting %v", docker.Get("test3"), container3) + } + +} diff --git a/filesystem.go b/filesystem.go new file mode 100644 index 000000000..5088590c3 --- /dev/null +++ b/filesystem.go @@ -0,0 +1,52 @@ +package docker + +import ( + "fmt" + "os" + "os/exec" +) + +type Filesystem struct { + RootFS string + RWPath string + Layers []string +} + +func (fs *Filesystem) createMountPoints() error { + if err := os.Mkdir(fs.RootFS, 0700); err != nil && !os.IsExist(err) { + return err + } + if err := os.Mkdir(fs.RWPath, 0700); err != nil && !os.IsExist(err) { + return err + } + return nil +} + +func (fs *Filesystem) Mount() error { + if err := fs.createMountPoints(); err != nil { + return err + } + rwBranch := fmt.Sprintf("%v=rw", fs.RWPath) + roBranches := "" + for _, layer := range fs.Layers { + roBranches += fmt.Sprintf("%v=ro:", layer) + } + branches := fmt.Sprintf("br:%v:%v", rwBranch, roBranches) + cmd := exec.Command("mount", "-t", "aufs", "-o", branches, "none", fs.RootFS) + if err := cmd.Run(); err != nil { + return err + } + return nil +} + +func (fs *Filesystem) Umount() error { + return exec.Command("umount", fs.RootFS).Run() +} + +func newFilesystem(rootfs string, rwpath string, layers []string) *Filesystem { + return &Filesystem{ + RootFS: rootfs, + RWPath: rwpath, + Layers: layers, + } +} diff --git a/filesystem_test.go b/filesystem_test.go new file mode 100644 index 000000000..18aa95185 --- /dev/null +++ b/filesystem_test.go @@ -0,0 +1,35 @@ +package docker + +import ( + "io/ioutil" + "testing" +) + +func TestFilesystem(t *testing.T) { + rootfs, err := ioutil.TempDir("", "docker-test-root") + if err != nil { + t.Fatal(err) + } + rwpath, err := ioutil.TempDir("", "docker-test-rw") + if err != nil { + t.Fatal(err) + } + + filesystem := newFilesystem(rootfs, rwpath, []string{"/var/lib/docker/images/ubuntu", "/var/lib/docker/images/test"}) + + if err := filesystem.Umount(); err == nil { + t.Errorf("Umount succeeded even though the filesystem was not mounted") + } + + if err := filesystem.Mount(); err != nil { + t.Fatal(err) + } + + if err := filesystem.Umount(); err != nil { + t.Fatal(err) + } + + if err := filesystem.Umount(); err == nil { + t.Errorf("Umount succeeded even though the filesystem was already umounted") + } +} diff --git a/lxc_template.go b/lxc_template.go new file mode 100644 index 000000000..af9855c66 --- /dev/null +++ b/lxc_template.go @@ -0,0 +1,94 @@ +package docker + +import ( + "text/template" +) + +const LxcTemplate = ` +# hostname +{{if .Config.Hostname}} +lxc.utsname = {{.Config.Hostname}} +{{else}} +lxc.utsname = {{.Name}} +{{end}} +#lxc.aa_profile = unconfined + +# network configuration +#lxc.network.type = veth +#lxc.network.flags = up +#lxc.network.link = br0 +#lxc.network.name = eth0 # Internal container network interface name +#lxc.network.mtu = 1500 +#lxc.network.ipv4 = {ip_address}/{ip_prefix_len} + +# root filesystem +lxc.rootfs = {{.Filesystem.RootFS}} + +# use a dedicated pts for the container (and limit the number of pseudo terminal +# available) +lxc.pts = 1024 + +# disable the main console +lxc.console = none + +# no controlling tty at all +lxc.tty = 1 + +# no implicit access to devices +lxc.cgroup.devices.deny = a + +# /dev/null and zero +lxc.cgroup.devices.allow = c 1:3 rwm +lxc.cgroup.devices.allow = c 1:5 rwm + +# consoles +lxc.cgroup.devices.allow = c 5:1 rwm +lxc.cgroup.devices.allow = c 5:0 rwm +lxc.cgroup.devices.allow = c 4:0 rwm +lxc.cgroup.devices.allow = c 4:1 rwm + +# /dev/urandom,/dev/random +lxc.cgroup.devices.allow = c 1:9 rwm +lxc.cgroup.devices.allow = c 1:8 rwm + +# /dev/pts/* - pts namespaces are "coming soon" +lxc.cgroup.devices.allow = c 136:* rwm +lxc.cgroup.devices.allow = c 5:2 rwm + +# tuntap +lxc.cgroup.devices.allow = c 10:200 rwm + +# fuse +#lxc.cgroup.devices.allow = c 10:229 rwm + +# rtc +#lxc.cgroup.devices.allow = c 254:0 rwm + + +# standard mount point +lxc.mount.entry = proc {{.Filesystem.RootFS}}/proc proc nosuid,nodev,noexec 0 0 +lxc.mount.entry = sysfs {{.Filesystem.RootFS}}/sys sysfs nosuid,nodev,noexec 0 0 +lxc.mount.entry = devpts {{.Filesystem.RootFS}}/dev/pts devpts newinstance,ptmxmode=0666,nosuid,noexec 0 0 +#lxc.mount.entry = varrun {{.Filesystem.RootFS}}/var/run tmpfs mode=755,size=4096k,nosuid,nodev,noexec 0 0 +#lxc.mount.entry = varlock {{.Filesystem.RootFS}}/var/lock tmpfs size=1024k,nosuid,nodev,noexec 0 0 +#lxc.mount.entry = shm {{.Filesystem.RootFS}}/dev/shm tmpfs size=65536k,nosuid,nodev,noexec 0 0 + + +# drop linux capabilities (apply mainly to the user root in the container) +lxc.cap.drop = audit_control audit_write mac_admin mac_override mknod net_raw setfcap setpcap sys_admin sys_boot sys_module sys_nice sys_pacct sys_rawio sys_resource sys_time sys_tty_config + +# limits +{{if .Config.Ram}} +lxc.cgroup.memory.limit_in_bytes = {{.Config.Ram}} +{{end}} +` + +var LxcTemplateCompiled *template.Template + +func init() { + var err error + LxcTemplateCompiled, err = template.New("lxc").Parse(LxcTemplate) + if err != nil { + panic(err) + } +} diff --git a/state.go b/state.go new file mode 100644 index 000000000..80da72485 --- /dev/null +++ b/state.go @@ -0,0 +1,48 @@ +package docker + +import ( + "sync" +) + +type State struct { + Running bool + Pid int + ExitCode int + + stateChangeLock *sync.Mutex + stateChangeCond *sync.Cond +} + +func newState() *State { + lock := new(sync.Mutex) + return &State{ + stateChangeLock: lock, + stateChangeCond: sync.NewCond(lock), + } +} + +func (s *State) setRunning(pid int) { + s.Running = true + s.ExitCode = 0 + s.Pid = pid + s.broadcast() +} + +func (s *State) setStopped(exitCode int) { + s.Running = false + s.Pid = 0 + s.ExitCode = exitCode + s.broadcast() +} + +func (s *State) broadcast() { + s.stateChangeLock.Lock() + s.stateChangeCond.Broadcast() + s.stateChangeLock.Unlock() +} + +func (s *State) wait() { + s.stateChangeLock.Lock() + s.stateChangeCond.Wait() + s.stateChangeLock.Unlock() +} diff --git a/utils.go b/utils.go new file mode 100644 index 000000000..63b88956c --- /dev/null +++ b/utils.go @@ -0,0 +1,115 @@ +package docker + +import ( + "bytes" + "container/list" + "io" + "sync" +) + +type bufReader struct { + buf *bytes.Buffer + reader io.Reader + err error + l sync.Mutex + wait sync.Cond +} + +func newBufReader(r io.Reader) *bufReader { + reader := &bufReader{ + buf: &bytes.Buffer{}, + reader: r, + } + reader.wait.L = &reader.l + go reader.drain() + return reader +} + +func (r *bufReader) drain() { + buf := make([]byte, 1024) + for { + n, err := r.reader.Read(buf) + if err != nil { + r.err = err + } else { + r.buf.Write(buf[0:n]) + } + r.l.Lock() + r.wait.Signal() + r.l.Unlock() + if err != nil { + break + } + } +} + +func (r *bufReader) Read(p []byte) (n int, err error) { + for { + n, err = r.buf.Read(p) + if n > 0 { + return n, err + } + if r.err != nil { + return 0, r.err + } + r.l.Lock() + r.wait.Wait() + r.l.Unlock() + } + return +} + +func (r *bufReader) Close() error { + closer, ok := r.reader.(io.ReadCloser) + if !ok { + return nil + } + return closer.Close() +} + +type writeBroadcaster struct { + writers *list.List +} + +func (w *writeBroadcaster) AddWriter(writer io.WriteCloser) { + w.writers.PushBack(writer) +} + +func (w *writeBroadcaster) RemoveWriter(writer io.WriteCloser) { + for e := w.writers.Front(); e != nil; e = e.Next() { + v := e.Value.(io.Writer) + if v == writer { + w.writers.Remove(e) + return + } + } +} + +func (w *writeBroadcaster) Write(p []byte) (n int, err error) { + failed := []*list.Element{} + for e := w.writers.Front(); e != nil; e = e.Next() { + writer := e.Value.(io.Writer) + if n, err := writer.Write(p); err != nil || n != len(p) { + // On error, evict the writer + failed = append(failed, e) + } + } + // We cannot remove while iterating, so it has to be done in + // a separate step + for _, e := range failed { + w.writers.Remove(e) + } + return len(p), nil +} + +func (w *writeBroadcaster) Close() error { + for e := w.writers.Front(); e != nil; e = e.Next() { + writer := e.Value.(io.WriteCloser) + writer.Close() + } + return nil +} + +func newWriteBroadcaster() *writeBroadcaster { + return &writeBroadcaster{list.New()} +} diff --git a/utils_test.go b/utils_test.go new file mode 100644 index 000000000..dbdcda434 --- /dev/null +++ b/utils_test.go @@ -0,0 +1,126 @@ +package docker + +import ( + "bytes" + "errors" + "io" + "io/ioutil" + "testing" +) + +func TestBufReader(t *testing.T) { + reader, writer := io.Pipe() + bufreader := newBufReader(reader) + + // Write everything down to a Pipe + // Usually, a pipe should block but because of the buffered reader, + // the writes will go through + done := make(chan bool) + go func() { + writer.Write([]byte("hello world")) + writer.Close() + done <- true + }() + + // Drain the reader *after* everything has been written, just to verify + // it is indeed buffering + <-done + output, err := ioutil.ReadAll(bufreader) + if err != nil { + t.Fatal(err) + } + if !bytes.Equal(output, []byte("hello world")) { + t.Error(string(output)) + } +} + +type dummyWriter struct { + buffer bytes.Buffer + failOnWrite bool +} + +func (dw *dummyWriter) Write(p []byte) (n int, err error) { + if dw.failOnWrite { + return 0, errors.New("Fake fail") + } + return dw.buffer.Write(p) +} + +func (dw *dummyWriter) String() string { + return dw.buffer.String() +} + +func (dw *dummyWriter) Close() error { + return nil +} + +func TestWriteBroadcaster(t *testing.T) { + writer := newWriteBroadcaster() + + // Test 1: Both bufferA and bufferB should contain "foo" + bufferA := &dummyWriter{} + writer.AddWriter(bufferA) + bufferB := &dummyWriter{} + writer.AddWriter(bufferB) + writer.Write([]byte("foo")) + + if bufferA.String() != "foo" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + + if bufferB.String() != "foo" { + t.Errorf("Buffer contains %v", bufferB.String()) + } + + // Test2: bufferA and bufferB should contain "foobar", + // while bufferC should only contain "bar" + bufferC := &dummyWriter{} + writer.AddWriter(bufferC) + writer.Write([]byte("bar")) + + if bufferA.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + + if bufferB.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferB.String()) + } + + if bufferC.String() != "bar" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + + // Test3: Test removal + writer.RemoveWriter(bufferB) + writer.Write([]byte("42")) + if bufferA.String() != "foobar42" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + if bufferB.String() != "foobar" { + t.Errorf("Buffer contains %v", bufferB.String()) + } + if bufferC.String() != "bar42" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + + // Test4: Test eviction on failure + bufferA.failOnWrite = true + writer.Write([]byte("fail")) + if bufferA.String() != "foobar42" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + if bufferC.String() != "bar42fail" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + // Even though we reset the flag, no more writes should go in there + bufferA.failOnWrite = false + writer.Write([]byte("test")) + if bufferA.String() != "foobar42" { + t.Errorf("Buffer contains %v", bufferA.String()) + } + if bufferC.String() != "bar42failtest" { + t.Errorf("Buffer contains %v", bufferC.String()) + } + + writer.Close() +}