Merge pull request #774 from vieux/cluster_opt

add --cluster-driver and --cluster-opt
This commit is contained in:
Andrea Luzzardi 2015-05-13 16:31:54 -07:00
commit cde1550a13
8 changed files with 184 additions and 43 deletions

View File

@ -104,9 +104,10 @@ func Run() {
Flags: []cli.Flag{ Flags: []cli.Flag{
flStore, flStore,
flStrategy, flFilter, flStrategy, flFilter,
flHosts, flHeartBeat, flOverCommit, flHosts,
flTLS, flTLSCaCert, flTLSCert, flTLSKey, flTLSVerify, flTLS, flTLSCaCert, flTLSCert, flTLSKey, flTLSVerify,
flEnableCors}, flEnableCors,
flCluster, flClusterOpt},
Action: manage, Action: manage,
}, },
{ {

View File

@ -77,11 +77,6 @@ var (
Name: "tlsverify", Name: "tlsverify",
Usage: "use TLS and verify the remote", Usage: "use TLS and verify the remote",
} }
flOverCommit = cli.Float64Flag{
Name: "overcommit, oc",
Usage: "overcommit to apply on resources",
Value: 0.05,
}
flStrategy = cli.StringFlag{ flStrategy = cli.StringFlag{
Name: "strategy", Name: "strategy",
Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]", Usage: "placement strategy to use [" + strings.Join(strategy.List(), ", ") + "]",
@ -99,9 +94,14 @@ var (
Value: &flFilterValue, Value: &flFilterValue,
} }
// flCluster = cli.StringFlag{ flCluster = cli.StringFlag{
// Name: "cluster, c", Name: "cluster-driver, c",
// Usage: "cluster to use [swarm, mesos]", Usage: "cluster driver to use [swarm]",
// Value: "swarm", Value: "swarm",
// } }
flClusterOpt = cli.StringSliceFlag{
Name: "cluster-opt",
Usage: "cluster driver options",
Value: &cli.StringSlice{},
}
) )

View File

@ -25,7 +25,8 @@ ARGUMENTS:
{{printf "\t"}} * <ip1>,<ip2>{{end}}{{if .Flags}} {{printf "\t"}} * <ip1>,<ip2>{{end}}{{if .Flags}}
OPTIONS: OPTIONS:
{{range .Flags}}{{.}} {{range .Flags}}{{.}}
{{end}}{{ end }} {{end}}{{if (eq .Name "manage")}}{{printf "\t * swarm.overcommit=0.05\tovercommit to apply on resources"}}
{{printf "\t * swarm.discovery.heartbeat=25\ttime in second between each heartbeat"}}{{end}}{{ end }}
` `
} }

View File

@ -6,7 +6,6 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"path" "path"
"strconv"
log "github.com/Sirupsen/logrus" log "github.com/Sirupsen/logrus"
"github.com/codegangsta/cli" "github.com/codegangsta/cli"
@ -120,18 +119,10 @@ func manage(c *cli.Context) {
sched := scheduler.New(s, fs) sched := scheduler.New(s, fs)
hb, err := strconv.ParseUint(c.String("heartbeat"), 0, 32) cluster, err := swarm.NewCluster(sched, store, tlsConfig, dflag, c.StringSlice("cluster-opt"))
if hb < 1 || err != nil { if err != nil {
log.Fatal("--heartbeat should be an unsigned integer and greater than 0") log.Fatal(err)
} }
options := &cluster.Options{
TLSConfig: tlsConfig,
OvercommitRatio: c.Float64("overcommit"),
Discovery: dflag,
Heartbeat: hb,
}
cluster := swarm.NewCluster(sched, store, options)
// see https://github.com/codegangsta/cli/issues/160 // see https://github.com/codegangsta/cli/issues/160
hosts := c.StringSlice("host") hosts := c.StringSlice("host")

View File

@ -1,11 +1,47 @@
package cluster package cluster
import "crypto/tls" import (
"strconv"
"strings"
)
// Options is exported // DriverOpts are key=values options
type Options struct { type DriverOpts []string
TLSConfig *tls.Config
OvercommitRatio float64 // String returns a string from the driver options
Discovery string func (do DriverOpts) String(key string) (string, bool) {
Heartbeat uint64 for _, opt := range do {
kv := strings.SplitN(opt, "=", 2)
if kv[0] == key {
return kv[1], true
}
}
return "", false
}
// Int returns an int64 from the driver options
func (do DriverOpts) Int(key string) (int64, bool) {
if value, ok := do.String(key); ok {
v, _ := strconv.ParseInt(value, 0, 64)
return v, true
}
return 0, false
}
// Uint returns an int64 from the driver options
func (do DriverOpts) Uint(key string) (uint64, bool) {
if value, ok := do.String(key); ok {
v, _ := strconv.ParseUint(value, 0, 64)
return v, true
}
return 0, false
}
// Float returns a float64 from the driver options
func (do DriverOpts) Float(key string) (float64, bool) {
if value, ok := do.String(key); ok {
v, _ := strconv.ParseFloat(value, 64)
return v, true
}
return 0.0, false
} }

97
cluster/options_test.go Normal file
View File

@ -0,0 +1,97 @@
package cluster
import (
"testing"
"github.com/stretchr/testify/assert"
)
var opts = DriverOpts{"foo1=bar", "foo2=-5", "foo3=7", "foo4=0.6"}
func TestString(t *testing.T) {
val, ok := opts.String("foo1")
assert.True(t, ok)
assert.Equal(t, val, "bar")
val, ok = opts.String("foo2")
assert.True(t, ok)
assert.Equal(t, val, "-5")
val, ok = opts.String("foo3")
assert.True(t, ok)
assert.Equal(t, val, "7")
val, ok = opts.String("foo4")
assert.True(t, ok)
assert.Equal(t, val, "0.6")
val, ok = opts.String("invalid")
assert.False(t, ok)
assert.Equal(t, val, "")
}
func TestInt(t *testing.T) {
val, ok := opts.Int("foo1")
assert.True(t, ok)
assert.Equal(t, val, 0)
val, ok = opts.Int("foo2")
assert.True(t, ok)
assert.Equal(t, val, -5)
val, ok = opts.Int("foo3")
assert.True(t, ok)
assert.Equal(t, val, 7)
val, ok = opts.Int("foo4")
assert.True(t, ok)
assert.Equal(t, val, 0)
val, ok = opts.Int("invalid")
assert.False(t, ok)
assert.Equal(t, val, 0)
}
func TestUint(t *testing.T) {
val, ok := opts.Uint("foo1")
assert.True(t, ok)
assert.Equal(t, val, uint(0))
val, ok = opts.Uint("foo2")
assert.True(t, ok)
assert.Equal(t, val, uint(0))
val, ok = opts.Uint("foo3")
assert.True(t, ok)
assert.Equal(t, val, uint(7))
val, ok = opts.Uint("foo4")
assert.True(t, ok)
assert.Equal(t, val, uint(0))
val, ok = opts.Uint("invalid")
assert.False(t, ok)
assert.Equal(t, val, uint(0))
}
func TestFloat(t *testing.T) {
val, ok := opts.Float("foo1")
assert.True(t, ok)
assert.Equal(t, val, 0.0)
val, ok = opts.Float("foo2")
assert.True(t, ok)
assert.Equal(t, val, -5.0)
val, ok = opts.Float("foo3")
assert.True(t, ok)
assert.Equal(t, val, 7.0)
val, ok = opts.Float("foo4")
assert.True(t, ok)
assert.Equal(t, val, 0.6)
val, ok = opts.Float("invalid")
assert.False(t, ok)
assert.Equal(t, val, 0.0)
}

View File

@ -1,6 +1,7 @@
package swarm package swarm
import ( import (
"crypto/tls"
"errors" "errors"
"fmt" "fmt"
"io" "io"
@ -26,24 +27,38 @@ type Cluster struct {
eventHandler cluster.EventHandler eventHandler cluster.EventHandler
engines map[string]*cluster.Engine engines map[string]*cluster.Engine
scheduler *scheduler.Scheduler scheduler *scheduler.Scheduler
options *cluster.Options
store *state.Store store *state.Store
overcommitRatio float64
discovery string
heartbeat uint64
TLSConfig *tls.Config
} }
// NewCluster is exported // NewCluster is exported
func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *cluster.Options) cluster.Cluster { func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, TLSConfig *tls.Config, dflag string, options cluster.DriverOpts) (cluster.Cluster, error) {
log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster") log.WithFields(log.Fields{"name": "swarm"}).Debug("Initializing cluster")
cluster := &Cluster{ cluster := &Cluster{
engines: make(map[string]*cluster.Engine), engines: make(map[string]*cluster.Engine),
scheduler: scheduler, scheduler: scheduler,
options: options,
store: store, store: store,
overcommitRatio: 0.05,
discovery: dflag,
TLSConfig: TLSConfig,
}
if val, ok := options.Float("swarm.overcommit"); ok {
cluster.overcommitRatio = val
}
if cluster.heartbeat, _ = options.Uint("swarm.discovery.heartbeat"); cluster.heartbeat < 1 {
return nil, errors.New("heartbeat should be an unsigned integer and greater than 0")
} }
// get the list of entries from the discovery service // get the list of entries from the discovery service
go func() { go func() {
d, err := discovery.New(options.Discovery, options.Heartbeat) d, err := discovery.New(cluster.discovery, cluster.heartbeat)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -58,7 +73,7 @@ func NewCluster(scheduler *scheduler.Scheduler, store *state.Store, options *clu
go d.Watch(cluster.newEntries) go d.Watch(cluster.newEntries)
}() }()
return cluster return cluster, nil
} }
// Handle callbacks for the events // Handle callbacks for the events
@ -151,8 +166,8 @@ func (c *Cluster) newEntries(entries []*discovery.Entry) {
for _, entry := range entries { for _, entry := range entries {
go func(m *discovery.Entry) { go func(m *discovery.Entry) {
if !c.hasEngine(m.String()) { if !c.hasEngine(m.String()) {
engine := cluster.NewEngine(m.String(), c.options.OvercommitRatio) engine := cluster.NewEngine(m.String(), c.overcommitRatio)
if err := engine.Connect(c.options.TLSConfig); err != nil { if err := engine.Connect(c.TLSConfig); err != nil {
log.Error(err) log.Error(err)
return return
} }

View File

@ -99,7 +99,7 @@ function swarm_manage() {
discovery="$@" discovery="$@"
fi fi
"$SWARM_BINARY" manage -H "$SWARM_HOST" --heartbeat=1 "$discovery" & "$SWARM_BINARY" manage -H "$SWARM_HOST" --cluster-opt "swarm.discovery.heartbeat=1" "$discovery" &
SWARM_PID=$! SWARM_PID=$!
wait_until_reachable "$SWARM_HOST" wait_until_reachable "$SWARM_HOST"
retry 10 1 check_swarm_nodes retry 10 1 check_swarm_nodes