Finish implementing multi-machine task execution

Signed-off-by: Nathan LeClaire <nathan.leclaire@gmail.com>
This commit is contained in:
Nathan LeClaire 2015-02-23 16:51:43 -08:00
parent 9ca79892a6
commit b01ce736d2
4 changed files with 186 additions and 65 deletions

View File

@ -9,9 +9,7 @@ import (
"path/filepath"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli"
@ -199,7 +197,7 @@ var Commands = []cli.Command{
{
Name: "kill",
Usage: "Kill a machine",
Description: "Argument is a machine name. Will use the active machine if none is provided.",
Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.",
Action: cmdKill,
},
{
@ -216,7 +214,7 @@ var Commands = []cli.Command{
{
Name: "restart",
Usage: "Restart a machine",
Description: "Argument is a machine name. Will use the active machine if none is provided.",
Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.",
Action: cmdRestart,
},
{
@ -228,7 +226,7 @@ var Commands = []cli.Command{
},
Name: "rm",
Usage: "Remove a machine",
Description: "Argument is a machine name. Will use the active machine if none is provided.",
Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.",
Action: cmdRm,
},
{
@ -256,19 +254,19 @@ var Commands = []cli.Command{
{
Name: "start",
Usage: "Start a machine",
Description: "Argument is a machine name. Will use the active machine if none is provided.",
Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.",
Action: cmdStart,
},
{
Name: "stop",
Usage: "Stop a machine",
Description: "Argument is a machine name. Will use the active machine if none is provided.",
Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.",
Action: cmdStop,
},
{
Name: "upgrade",
Usage: "Upgrade a machine to the latest version of Docker",
Description: "Argument is a machine name. Will use the active machine if none is provided.",
Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.",
Action: cmdUpgrade,
},
{
@ -563,79 +561,110 @@ func cmdSsh(c *cli.Context) {
}
}
// machineCommand maps the command name to the corresponding machine command
// it is intended to be used by runCommand using a waitgroup and error channel
// to enable running commands across multiple machines asynchronously
func machineCommand(name string, machine *Host, wg *sync.WaitGroup, errorChan chan<- error) {
commands := map[string]interface{}{
"start": machine.Start,
"stop": machine.Stop,
// machineCommand maps the command name to the corresponding machine command.
// We run commands concurrently and communicate back an error if there was one.
func machineCommand(actionName string, machine *Host, errorChan chan<- error) {
commands := map[string](func() error){
"start": machine.Driver.Start,
"stop": machine.Driver.Stop,
"restart": machine.Driver.Restart,
"kill": machine.Driver.Kill,
"upgrade": machine.Upgrade,
"upgrade": machine.Driver.Upgrade,
}
log.Debugf("command=%s machine=%s", name, machine.Name)
log.Debugf("command=%s machine=%s", actionName, machine.Name)
if err := commands[name].(func() error)(); err != nil {
if err := commands[actionName](); err != nil {
errorChan <- err
return
}
wg.Done()
errorChan <- nil
}
// runCommand will run the command across multiple machines
func runCommand(name string, c *cli.Context) error {
errorChan := make(chan error)
go func() {
err := <-errorChan
log.Errorf(err.Error())
}()
// runActionForeachMachine will run the command across multiple machines
func runActionForeachMachine(actionName string, machines []*Host) {
var (
numConcurrentActions = 0
serialMachines = []*Host{}
errorChan = make(chan error)
)
wg := &sync.WaitGroup{}
for _, machine := range machines {
// Virtualbox is temperamental about doing things concurrently,
// so we schedule the actions in a "queue" to be executed serially
// after the concurrent actions are scheduled.
switch machine.DriverName {
case "virtualbox":
machine := machine
serialMachines = append(serialMachines, machine)
default:
numConcurrentActions++
go machineCommand(actionName, machine, errorChan)
}
}
// While the concurrent actions are running,
// do the serial actions. As the name implies,
// these run one at a time.
for _, machine := range serialMachines {
serialChan := make(chan error)
go machineCommand(actionName, machine, serialChan)
if err := <-serialChan; err != nil {
log.Errorln(err)
}
close(serialChan)
}
// TODO: We should probably only do 5-10 of these
// at a time, since otherwise cloud providers might
// rate limit us.
for i := 0; i < numConcurrentActions; i++ {
if err := <-errorChan; err != nil {
log.Errorln(err)
}
}
close(errorChan)
}
func runActionWithContext(actionName string, c *cli.Context) error {
machines, err := getHosts(c)
if err != nil {
return err
}
for _, machine := range machines {
wg.Add(1)
go machineCommand(name, machine, wg, errorChan)
time.Sleep(1 * time.Second)
}
wg.Wait()
runActionForeachMachine(actionName, machines)
return nil
}
func cmdStart(c *cli.Context) {
if err := runCommand("start", c); err != nil {
if err := runActionWithContext("start", c); err != nil {
log.Fatal(err)
}
}
func cmdStop(c *cli.Context) {
if err := runCommand("stop", c); err != nil {
if err := runActionWithContext("stop", c); err != nil {
log.Fatal(err)
}
}
func cmdRestart(c *cli.Context) {
if err := runCommand("restart", c); err != nil {
if err := runActionWithContext("restart", c); err != nil {
log.Fatal(err)
}
}
func cmdKill(c *cli.Context) {
if err := runCommand("kill", c); err != nil {
if err := runActionWithContext("kill", c); err != nil {
log.Fatal(err)
}
}
func cmdUpgrade(c *cli.Context) {
if err := runCommand("upgrade", c); err != nil {
if err := runActionWithContext("upgrade", c); err != nil {
log.Fatal(err)
}
}

View File

@ -2,7 +2,6 @@ package main
import (
"flag"
"fmt"
"io/ioutil"
"os/exec"
"testing"
@ -49,10 +48,12 @@ func (d *FakeDriver) Remove() error {
}
func (d *FakeDriver) Start() error {
d.MockState = state.Running
return nil
}
func (d *FakeDriver) Stop() error {
d.MockState = state.Stopped
return nil
}
@ -93,12 +94,12 @@ func TestGetHosts(t *testing.T) {
store := NewStore(TestStoreDir, "", "")
hostA, hostAerr := store.Create("test-a", "none", flags)
_, hostAerr := store.Create("test-a", "none", flags)
if hostAerr != nil {
t.Fatal(hostAerr)
}
hostB, hostBerr := store.Create("test-b", "none", flags)
_, hostBerr := store.Create("test-b", "none", flags)
if hostBerr != nil {
t.Fatal(hostBerr)
}
@ -106,24 +107,21 @@ func TestGetHosts(t *testing.T) {
set := flag.NewFlagSet("start", 0)
set.Parse([]string{"test-a", "test-b"})
c := cli.NewContext(nil, set, nil)
globalSet := flag.NewFlagSet("-d", 0)
globalSet.String("-d", "none", "driver")
globalSet.String("storage-path", TestStoreDir, "storage path")
globalSet.String("tls-ca-cert", "", "")
globalSet.String("tls-ca-key", "", "")
c := cli.NewContext(nil, set, globalSet)
hosts, err := getHosts(c)
if err != nil {
t.Fatal(err)
}
fmt.Println(hosts)
fmt.Println(hostA)
fmt.Println(hostB)
if err := clearHosts(); err != nil {
t.Fatal(err)
if len(hosts) != 2 {
t.Fatal("Expected %d hosts, got %d hosts", 2, len(hosts))
}
}
@ -178,3 +176,104 @@ func TestGetHostState(t *testing.T) {
}
}
}
func TestRunActionForeachMachine(t *testing.T) {
storePath, err := ioutil.TempDir("", ".docker")
if err != nil {
t.Fatal("Error creating tmp dir:", err)
}
// Assume a bunch of machines in randomly started or
// stopped states.
machines := []*Host{
{
Name: "foo",
DriverName: "fakedriver",
Driver: &FakeDriver{
MockState: state.Running,
},
storePath: storePath,
},
{
Name: "bar",
DriverName: "fakedriver",
Driver: &FakeDriver{
MockState: state.Stopped,
},
storePath: storePath,
},
{
Name: "baz",
// Ssh, don't tell anyone but this
// driver only _thinks_ it's named
// virtualbox... (to test serial actions)
// It's actually FakeDriver!
DriverName: "virtualbox",
Driver: &FakeDriver{
MockState: state.Stopped,
},
storePath: storePath,
},
{
Name: "spam",
DriverName: "virtualbox",
Driver: &FakeDriver{
MockState: state.Running,
},
storePath: storePath,
},
{
Name: "eggs",
DriverName: "fakedriver",
Driver: &FakeDriver{
MockState: state.Stopped,
},
storePath: storePath,
},
{
Name: "ham",
DriverName: "fakedriver",
Driver: &FakeDriver{
MockState: state.Running,
},
storePath: storePath,
},
}
runActionForeachMachine("start", machines)
expected := map[string]state.State{
"foo": state.Running,
"bar": state.Running,
"baz": state.Running,
"spam": state.Running,
"eggs": state.Running,
"ham": state.Running,
}
for _, machine := range machines {
state, _ := machine.Driver.GetState()
if expected[machine.Name] != state {
t.Fatalf("Expected machine %s to have state %s, got state %s", machine.Name, state, expected[machine.Name])
}
}
// OK, now let's stop them all!
expected = map[string]state.State{
"foo": state.Stopped,
"bar": state.Stopped,
"baz": state.Stopped,
"spam": state.Stopped,
"eggs": state.Stopped,
"ham": state.Stopped,
}
runActionForeachMachine("stop", machines)
for _, machine := range machines {
state, _ := machine.Driver.GetState()
if expected[machine.Name] != state {
t.Fatalf("Expected machine %s to have state %s, got state %s", machine.Name, state, expected[machine.Name])
}
}
}

View File

@ -28,7 +28,6 @@ func getTestStore() (*Store, error) {
os.Exit(1)
}
os.Setenv("MACHINE_DIR", tmpDir)
return NewStore(tmpDir, hostTestCaCert, hostTestPrivateKey), nil
}

View File

@ -6,18 +6,12 @@ import (
"testing"
_ "github.com/docker/machine/drivers/none"
"github.com/docker/machine/utils"
)
const (
TestStoreDir = ".store-test"
)
func init() {
os.Setenv("MACHINE_STORAGE_PATH", TestStoreDir)
}
type DriverOptionsMock struct {
Data map[string]interface{}
}
@ -58,7 +52,7 @@ func TestStoreCreate(t *testing.T) {
flags := getDefaultTestDriverFlags()
store := NewStore("", "", "")
store := NewStore(TestStoreDir, "", "")
host, err := store.Create("test", "none", flags)
if err != nil {
@ -67,7 +61,7 @@ func TestStoreCreate(t *testing.T) {
if host.Name != "test" {
t.Fatal("Host name is incorrect")
}
path := filepath.Join(utils.GetMachineDir(), "test")
path := filepath.Join(TestStoreDir, "test")
if _, err := os.Stat(path); os.IsNotExist(err) {
t.Fatalf("Host path doesn't exist: %s", path)
}
@ -80,12 +74,12 @@ func TestStoreRemove(t *testing.T) {
flags := getDefaultTestDriverFlags()
store := NewStore("", "", "")
store := NewStore(TestStoreDir, "", "")
_, err := store.Create("test", "none", flags)
if err != nil {
t.Fatal(err)
}
path := filepath.Join(utils.GetMachineDir(), "test")
path := filepath.Join(TestStoreDir, "test")
if _, err := os.Stat(path); os.IsNotExist(err) {
t.Fatalf("Host path doesn't exist: %s", path)
}
@ -105,7 +99,7 @@ func TestStoreList(t *testing.T) {
flags := getDefaultTestDriverFlags()
store := NewStore("", "", "")
store := NewStore(TestStoreDir, "", "")
_, err := store.Create("test", "none", flags)
if err != nil {
t.Fatal(err)
@ -126,7 +120,7 @@ func TestStoreExists(t *testing.T) {
flags := getDefaultTestDriverFlags()
store := NewStore("", "", "")
store := NewStore(TestStoreDir, "", "")
exists, err := store.Exists("test")
if exists {
t.Fatal("Exists returned true when it should have been false")
@ -153,13 +147,13 @@ func TestStoreLoad(t *testing.T) {
flags := getDefaultTestDriverFlags()
flags.Data["url"] = expectedURL
store := NewStore("", "", "")
store := NewStore(TestStoreDir, "", "")
_, err := store.Create("test", "none", flags)
if err != nil {
t.Fatal(err)
}
store = NewStore("", "", "")
store = NewStore(TestStoreDir, "", "")
host, err := store.Load("test")
if host.Name != "test" {
t.Fatal("Host name is incorrect")
@ -180,7 +174,7 @@ func TestStoreGetSetActive(t *testing.T) {
flags := getDefaultTestDriverFlags()
store := NewStore("", "", "")
store := NewStore(TestStoreDir, "", "")
// No hosts set
host, err := store.GetActive()