refactor code: move filter/ and strategy/ out of scheduler and create a simple

scheduler interface.

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2015-01-30 21:23:02 +00:00
parent 6afdcb90bf
commit eb88068f93
24 changed files with 103 additions and 58 deletions

View File

@ -80,7 +80,7 @@ other discovery services.
## Advanced Scheduling ## Advanced Scheduling
See [filters](scheduler/filter) and [strategies](scheduler/strategy) to learn See [filters](filter) and [strategies](strategy) to learn
more about advanced scheduling. more about advanced scheduling.
## TLS ## TLS

View File

@ -16,8 +16,8 @@ import (
dockerfilters "github.com/docker/docker/pkg/parsers/filters" dockerfilters "github.com/docker/docker/pkg/parsers/filters"
"github.com/docker/docker/pkg/units" "github.com/docker/docker/pkg/units"
"github.com/docker/swarm/cluster" "github.com/docker/swarm/cluster"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler" "github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/version" "github.com/docker/swarm/version"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/samalba/dockerclient" "github.com/samalba/dockerclient"
@ -27,7 +27,7 @@ const APIVERSION = "1.16"
type context struct { type context struct {
cluster *cluster.Cluster cluster *cluster.Cluster
scheduler *scheduler.Scheduler scheduler scheduler.Scheduler
eventsHandler *eventsHandler eventsHandler *eventsHandler
debug bool debug bool
tlsConfig *tls.Config tlsConfig *tls.Config

View File

@ -12,7 +12,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func serveRequest(c *cluster.Cluster, s *scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error { func serveRequest(c *cluster.Cluster, s scheduler.Scheduler, w http.ResponseWriter, req *http.Request) error {
context := &context{ context := &context{
cluster: c, cluster: c,
scheduler: s, scheduler: s,

View File

@ -29,7 +29,7 @@ func newListener(proto, addr string, tlsConfig *tls.Config) (net.Listener, error
return l, nil return l, nil
} }
func ListenAndServe(c *cluster.Cluster, s *scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error { func ListenAndServe(c *cluster.Cluster, s scheduler.Scheduler, hosts []string, enableCors bool, tlsConfig *tls.Config) error {
context := &context{ context := &context{
cluster: c, cluster: c,
scheduler: s, scheduler: s,

View File

@ -94,4 +94,9 @@ var (
Usage: "filter to use [constraint, affinity, health, port, dependency]", Usage: "filter to use [constraint, affinity, health, port, dependency]",
Value: &flFilterValue, Value: &flFilterValue,
} }
flScheduler = cli.StringFlag{
Name: "scheduler, s",
Usage: "scheduler to use [builtin, mesos]",
Value: "builtin",
}
) )

View File

@ -12,10 +12,10 @@ import (
"github.com/docker/swarm/api" "github.com/docker/swarm/api"
"github.com/docker/swarm/cluster" "github.com/docker/swarm/cluster"
"github.com/docker/swarm/discovery" "github.com/docker/swarm/discovery"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler" "github.com/docker/swarm/scheduler"
"github.com/docker/swarm/scheduler/filter"
"github.com/docker/swarm/scheduler/strategy"
"github.com/docker/swarm/state" "github.com/docker/swarm/state"
"github.com/docker/swarm/strategy"
) )
type logHandler struct { type logHandler struct {
@ -137,12 +137,16 @@ func manage(c *cli.Context) {
go d.Watch(cluster.UpdateNodes) go d.Watch(cluster.UpdateNodes)
}() }()
sched := scheduler.NewScheduler( sched, err := scheduler.New(c.String("scheduler"),
cluster, cluster,
s, s,
fs, fs,
) )
if err != nil {
log.Fatal(err)
}
// see https://github.com/codegangsta/cli/issues/160 // see https://github.com/codegangsta/cli/issues/160
hosts := c.StringSlice("host") hosts := c.StringSlice("host")
if c.IsSet("host") || c.IsSet("H") { if c.IsSet("host") || c.IsSet("H") {

View File

@ -0,0 +1,63 @@
package builtin
import (
"sync"
"github.com/docker/swarm/cluster"
"github.com/docker/swarm/filter"
"github.com/docker/swarm/strategy"
"github.com/samalba/dockerclient"
)
type BuiltinScheduler struct {
sync.Mutex
cluster *cluster.Cluster
strategy strategy.PlacementStrategy
filters []filter.Filter
}
func (s *BuiltinScheduler) Initialize(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter) {
s.cluster = cluster
s.strategy = strategy
s.filters = filters
}
// Find a nice home for our container.
func (s *BuiltinScheduler) selectNodeForContainer(config *dockerclient.ContainerConfig) (*cluster.Node, error) {
candidates := s.cluster.Nodes()
accepted, err := filter.ApplyFilters(s.filters, config, candidates)
if err != nil {
return nil, err
}
return s.strategy.PlaceContainer(config, accepted)
}
// Schedule a brand new container into the cluster.
func (s *BuiltinScheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
/*Disable for now
if config.Memory == 0 || config.CpuShares == 0 {
return nil, fmt.Errorf("Creating containers in clustering mode requires resource constraints (-c and -m) to be set")
}
*/
s.Lock()
defer s.Unlock()
node, err := s.selectNodeForContainer(config)
if err != nil {
return nil, err
}
return s.cluster.DeployContainer(node, config, name)
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *BuiltinScheduler) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
return s.cluster.DestroyContainer(container, force)
}

View File

@ -1,65 +1,38 @@
package scheduler package scheduler
import ( import (
"sync" "errors"
log "github.com/Sirupsen/logrus"
"github.com/docker/swarm/cluster" "github.com/docker/swarm/cluster"
"github.com/docker/swarm/scheduler/filter" "github.com/docker/swarm/filter"
"github.com/docker/swarm/scheduler/strategy" "github.com/docker/swarm/scheduler/builtin"
"github.com/docker/swarm/strategy"
"github.com/samalba/dockerclient" "github.com/samalba/dockerclient"
) )
type Scheduler struct { type Scheduler interface {
sync.Mutex Initialize(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter)
CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error)
cluster *cluster.Cluster RemoveContainer(container *cluster.Container, force bool) error
strategy strategy.PlacementStrategy
filters []filter.Filter
} }
func NewScheduler(cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter) *Scheduler { var (
return &Scheduler{ schedulers map[string]Scheduler
cluster: cluster, ErrNotSupported = errors.New("scheduler not supported")
strategy: strategy, )
filters: filters,
func init() {
schedulers = map[string]Scheduler{
"builtin": &builtin.BuiltinScheduler{},
} }
} }
// Find a nice home for our container. func New(name string, cluster *cluster.Cluster, strategy strategy.PlacementStrategy, filters []filter.Filter) (Scheduler, error) {
func (s *Scheduler) selectNodeForContainer(config *dockerclient.ContainerConfig) (*cluster.Node, error) { if scheduler, exists := schedulers[name]; exists {
candidates := s.cluster.Nodes() log.WithField("name", name).Debug("Initializing scheduler")
scheduler.Initialize(cluster, strategy, filters)
accepted, err := filter.ApplyFilters(s.filters, config, candidates) return scheduler, nil
if err != nil {
return nil, err
} }
return nil, ErrNotSupported
return s.strategy.PlaceContainer(config, accepted)
}
// Schedule a brand new container into the cluster.
func (s *Scheduler) CreateContainer(config *dockerclient.ContainerConfig, name string) (*cluster.Container, error) {
/*Disable for now
if config.Memory == 0 || config.CpuShares == 0 {
return nil, fmt.Errorf("Creating containers in clustering mode requires resource constraints (-c and -m) to be set")
}
*/
s.Lock()
defer s.Unlock()
node, err := s.selectNodeForContainer(config)
if err != nil {
return nil, err
}
return s.cluster.DeployContainer(node, config, name)
}
// Remove a container from the cluster. Containers should always be destroyed
// through the scheduler to guarantee atomicity.
func (s *Scheduler) RemoveContainer(container *cluster.Container, force bool) error {
s.Lock()
defer s.Unlock()
return s.cluster.DestroyContainer(container, force)
} }