From b01ce736d251bf77a9e25137b36f22dd8efc038d Mon Sep 17 00:00:00 2001 From: Nathan LeClaire Date: Mon, 23 Feb 2015 16:51:43 -0800 Subject: [PATCH] Finish implementing multi-machine task execution Signed-off-by: Nathan LeClaire --- commands.go | 107 ++++++++++++++++++++++++++---------------- commands_test.go | 119 +++++++++++++++++++++++++++++++++++++++++++---- host_test.go | 1 - store_test.go | 24 ++++------ 4 files changed, 186 insertions(+), 65 deletions(-) diff --git a/commands.go b/commands.go index ddf9e852ff..e749923df8 100644 --- a/commands.go +++ b/commands.go @@ -9,9 +9,7 @@ import ( "path/filepath" "sort" "strings" - "sync" "text/tabwriter" - "time" log "github.com/Sirupsen/logrus" "github.com/codegangsta/cli" @@ -199,7 +197,7 @@ var Commands = []cli.Command{ { Name: "kill", Usage: "Kill a machine", - Description: "Argument is a machine name. Will use the active machine if none is provided.", + Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.", Action: cmdKill, }, { @@ -216,7 +214,7 @@ var Commands = []cli.Command{ { Name: "restart", Usage: "Restart a machine", - Description: "Argument is a machine name. Will use the active machine if none is provided.", + Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.", Action: cmdRestart, }, { @@ -228,7 +226,7 @@ var Commands = []cli.Command{ }, Name: "rm", Usage: "Remove a machine", - Description: "Argument is a machine name. Will use the active machine if none is provided.", + Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.", Action: cmdRm, }, { @@ -256,19 +254,19 @@ var Commands = []cli.Command{ { Name: "start", Usage: "Start a machine", - Description: "Argument is a machine name. Will use the active machine if none is provided.", + Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.", Action: cmdStart, }, { Name: "stop", Usage: "Stop a machine", - Description: "Argument is a machine name. Will use the active machine if none is provided.", + Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.", Action: cmdStop, }, { Name: "upgrade", Usage: "Upgrade a machine to the latest version of Docker", - Description: "Argument is a machine name. Will use the active machine if none is provided.", + Description: "Argument(s) are one or more machine names. Will use the active machine if none is provided.", Action: cmdUpgrade, }, { @@ -563,79 +561,110 @@ func cmdSsh(c *cli.Context) { } } -// machineCommand maps the command name to the corresponding machine command -// it is intended to be used by runCommand using a waitgroup and error channel -// to enable running commands across multiple machines asynchronously -func machineCommand(name string, machine *Host, wg *sync.WaitGroup, errorChan chan<- error) { - commands := map[string]interface{}{ - "start": machine.Start, - "stop": machine.Stop, +// machineCommand maps the command name to the corresponding machine command. +// We run commands concurrently and communicate back an error if there was one. +func machineCommand(actionName string, machine *Host, errorChan chan<- error) { + commands := map[string](func() error){ + "start": machine.Driver.Start, + "stop": machine.Driver.Stop, "restart": machine.Driver.Restart, "kill": machine.Driver.Kill, - "upgrade": machine.Upgrade, + "upgrade": machine.Driver.Upgrade, } - log.Debugf("command=%s machine=%s", name, machine.Name) + log.Debugf("command=%s machine=%s", actionName, machine.Name) - if err := commands[name].(func() error)(); err != nil { + if err := commands[actionName](); err != nil { errorChan <- err + return } - wg.Done() + errorChan <- nil } -// runCommand will run the command across multiple machines -func runCommand(name string, c *cli.Context) error { - errorChan := make(chan error) - go func() { - err := <-errorChan - log.Errorf(err.Error()) - }() +// runActionForeachMachine will run the command across multiple machines +func runActionForeachMachine(actionName string, machines []*Host) { + var ( + numConcurrentActions = 0 + serialMachines = []*Host{} + errorChan = make(chan error) + ) - wg := &sync.WaitGroup{} + for _, machine := range machines { + // Virtualbox is temperamental about doing things concurrently, + // so we schedule the actions in a "queue" to be executed serially + // after the concurrent actions are scheduled. + switch machine.DriverName { + case "virtualbox": + machine := machine + serialMachines = append(serialMachines, machine) + default: + numConcurrentActions++ + go machineCommand(actionName, machine, errorChan) + } + } + // While the concurrent actions are running, + // do the serial actions. As the name implies, + // these run one at a time. + for _, machine := range serialMachines { + serialChan := make(chan error) + go machineCommand(actionName, machine, serialChan) + if err := <-serialChan; err != nil { + log.Errorln(err) + } + close(serialChan) + } + + // TODO: We should probably only do 5-10 of these + // at a time, since otherwise cloud providers might + // rate limit us. + for i := 0; i < numConcurrentActions; i++ { + if err := <-errorChan; err != nil { + log.Errorln(err) + } + } + + close(errorChan) +} + +func runActionWithContext(actionName string, c *cli.Context) error { machines, err := getHosts(c) if err != nil { return err } - for _, machine := range machines { - wg.Add(1) - go machineCommand(name, machine, wg, errorChan) - time.Sleep(1 * time.Second) - } - - wg.Wait() + runActionForeachMachine(actionName, machines) return nil } func cmdStart(c *cli.Context) { - if err := runCommand("start", c); err != nil { + if err := runActionWithContext("start", c); err != nil { log.Fatal(err) } } func cmdStop(c *cli.Context) { - if err := runCommand("stop", c); err != nil { + if err := runActionWithContext("stop", c); err != nil { log.Fatal(err) } } func cmdRestart(c *cli.Context) { - if err := runCommand("restart", c); err != nil { + if err := runActionWithContext("restart", c); err != nil { log.Fatal(err) } } func cmdKill(c *cli.Context) { - if err := runCommand("kill", c); err != nil { + if err := runActionWithContext("kill", c); err != nil { log.Fatal(err) } } func cmdUpgrade(c *cli.Context) { - if err := runCommand("upgrade", c); err != nil { + if err := runActionWithContext("upgrade", c); err != nil { log.Fatal(err) } } diff --git a/commands_test.go b/commands_test.go index 96d8163d9f..cf934fa463 100644 --- a/commands_test.go +++ b/commands_test.go @@ -2,7 +2,6 @@ package main import ( "flag" - "fmt" "io/ioutil" "os/exec" "testing" @@ -49,10 +48,12 @@ func (d *FakeDriver) Remove() error { } func (d *FakeDriver) Start() error { + d.MockState = state.Running return nil } func (d *FakeDriver) Stop() error { + d.MockState = state.Stopped return nil } @@ -93,12 +94,12 @@ func TestGetHosts(t *testing.T) { store := NewStore(TestStoreDir, "", "") - hostA, hostAerr := store.Create("test-a", "none", flags) + _, hostAerr := store.Create("test-a", "none", flags) if hostAerr != nil { t.Fatal(hostAerr) } - hostB, hostBerr := store.Create("test-b", "none", flags) + _, hostBerr := store.Create("test-b", "none", flags) if hostBerr != nil { t.Fatal(hostBerr) } @@ -106,24 +107,21 @@ func TestGetHosts(t *testing.T) { set := flag.NewFlagSet("start", 0) set.Parse([]string{"test-a", "test-b"}) - c := cli.NewContext(nil, set, nil) globalSet := flag.NewFlagSet("-d", 0) globalSet.String("-d", "none", "driver") globalSet.String("storage-path", TestStoreDir, "storage path") globalSet.String("tls-ca-cert", "", "") globalSet.String("tls-ca-key", "", "") + c := cli.NewContext(nil, set, globalSet) + hosts, err := getHosts(c) if err != nil { t.Fatal(err) } - fmt.Println(hosts) - fmt.Println(hostA) - fmt.Println(hostB) - - if err := clearHosts(); err != nil { - t.Fatal(err) + if len(hosts) != 2 { + t.Fatal("Expected %d hosts, got %d hosts", 2, len(hosts)) } } @@ -178,3 +176,104 @@ func TestGetHostState(t *testing.T) { } } } + +func TestRunActionForeachMachine(t *testing.T) { + storePath, err := ioutil.TempDir("", ".docker") + if err != nil { + t.Fatal("Error creating tmp dir:", err) + } + + // Assume a bunch of machines in randomly started or + // stopped states. + machines := []*Host{ + { + Name: "foo", + DriverName: "fakedriver", + Driver: &FakeDriver{ + MockState: state.Running, + }, + storePath: storePath, + }, + { + Name: "bar", + DriverName: "fakedriver", + Driver: &FakeDriver{ + MockState: state.Stopped, + }, + storePath: storePath, + }, + { + Name: "baz", + // Ssh, don't tell anyone but this + // driver only _thinks_ it's named + // virtualbox... (to test serial actions) + // It's actually FakeDriver! + DriverName: "virtualbox", + Driver: &FakeDriver{ + MockState: state.Stopped, + }, + storePath: storePath, + }, + { + Name: "spam", + DriverName: "virtualbox", + Driver: &FakeDriver{ + MockState: state.Running, + }, + storePath: storePath, + }, + { + Name: "eggs", + DriverName: "fakedriver", + Driver: &FakeDriver{ + MockState: state.Stopped, + }, + storePath: storePath, + }, + { + Name: "ham", + DriverName: "fakedriver", + Driver: &FakeDriver{ + MockState: state.Running, + }, + storePath: storePath, + }, + } + + runActionForeachMachine("start", machines) + + expected := map[string]state.State{ + "foo": state.Running, + "bar": state.Running, + "baz": state.Running, + "spam": state.Running, + "eggs": state.Running, + "ham": state.Running, + } + + for _, machine := range machines { + state, _ := machine.Driver.GetState() + if expected[machine.Name] != state { + t.Fatalf("Expected machine %s to have state %s, got state %s", machine.Name, state, expected[machine.Name]) + } + } + + // OK, now let's stop them all! + expected = map[string]state.State{ + "foo": state.Stopped, + "bar": state.Stopped, + "baz": state.Stopped, + "spam": state.Stopped, + "eggs": state.Stopped, + "ham": state.Stopped, + } + + runActionForeachMachine("stop", machines) + + for _, machine := range machines { + state, _ := machine.Driver.GetState() + if expected[machine.Name] != state { + t.Fatalf("Expected machine %s to have state %s, got state %s", machine.Name, state, expected[machine.Name]) + } + } +} diff --git a/host_test.go b/host_test.go index 427518ad4c..f1a7816646 100644 --- a/host_test.go +++ b/host_test.go @@ -28,7 +28,6 @@ func getTestStore() (*Store, error) { os.Exit(1) } - os.Setenv("MACHINE_DIR", tmpDir) return NewStore(tmpDir, hostTestCaCert, hostTestPrivateKey), nil } diff --git a/store_test.go b/store_test.go index 6e5dfbcf63..524c0afa17 100644 --- a/store_test.go +++ b/store_test.go @@ -6,18 +6,12 @@ import ( "testing" _ "github.com/docker/machine/drivers/none" - "github.com/docker/machine/utils" ) const ( TestStoreDir = ".store-test" ) -func init() { - - os.Setenv("MACHINE_STORAGE_PATH", TestStoreDir) -} - type DriverOptionsMock struct { Data map[string]interface{} } @@ -58,7 +52,7 @@ func TestStoreCreate(t *testing.T) { flags := getDefaultTestDriverFlags() - store := NewStore("", "", "") + store := NewStore(TestStoreDir, "", "") host, err := store.Create("test", "none", flags) if err != nil { @@ -67,7 +61,7 @@ func TestStoreCreate(t *testing.T) { if host.Name != "test" { t.Fatal("Host name is incorrect") } - path := filepath.Join(utils.GetMachineDir(), "test") + path := filepath.Join(TestStoreDir, "test") if _, err := os.Stat(path); os.IsNotExist(err) { t.Fatalf("Host path doesn't exist: %s", path) } @@ -80,12 +74,12 @@ func TestStoreRemove(t *testing.T) { flags := getDefaultTestDriverFlags() - store := NewStore("", "", "") + store := NewStore(TestStoreDir, "", "") _, err := store.Create("test", "none", flags) if err != nil { t.Fatal(err) } - path := filepath.Join(utils.GetMachineDir(), "test") + path := filepath.Join(TestStoreDir, "test") if _, err := os.Stat(path); os.IsNotExist(err) { t.Fatalf("Host path doesn't exist: %s", path) } @@ -105,7 +99,7 @@ func TestStoreList(t *testing.T) { flags := getDefaultTestDriverFlags() - store := NewStore("", "", "") + store := NewStore(TestStoreDir, "", "") _, err := store.Create("test", "none", flags) if err != nil { t.Fatal(err) @@ -126,7 +120,7 @@ func TestStoreExists(t *testing.T) { flags := getDefaultTestDriverFlags() - store := NewStore("", "", "") + store := NewStore(TestStoreDir, "", "") exists, err := store.Exists("test") if exists { t.Fatal("Exists returned true when it should have been false") @@ -153,13 +147,13 @@ func TestStoreLoad(t *testing.T) { flags := getDefaultTestDriverFlags() flags.Data["url"] = expectedURL - store := NewStore("", "", "") + store := NewStore(TestStoreDir, "", "") _, err := store.Create("test", "none", flags) if err != nil { t.Fatal(err) } - store = NewStore("", "", "") + store = NewStore(TestStoreDir, "", "") host, err := store.Load("test") if host.Name != "test" { t.Fatal("Host name is incorrect") @@ -180,7 +174,7 @@ func TestStoreGetSetActive(t *testing.T) { flags := getDefaultTestDriverFlags() - store := NewStore("", "", "") + store := NewStore(TestStoreDir, "", "") // No hosts set host, err := store.GetActive()