Merge pull request #1542 from jimenez/slave_to_agent

Name changing slave to agent
This commit is contained in:
Victor Vieux 2015-12-14 13:57:31 -08:00
commit ed987b8d85
5 changed files with 49 additions and 49 deletions

View File

@ -7,7 +7,7 @@ import (
"github.com/mesos/mesos-go/mesosproto" "github.com/mesos/mesos-go/mesosproto"
) )
type slave struct { type agent struct {
sync.RWMutex sync.RWMutex
id string id string
@ -16,8 +16,8 @@ type slave struct {
engine *cluster.Engine engine *cluster.Engine
} }
func newSlave(sid string, e *cluster.Engine) *slave { func newAgent(sid string, e *cluster.Engine) *agent {
return &slave{ return &agent{
id: sid, id: sid,
offers: make(map[string]*mesosproto.Offer), offers: make(map[string]*mesosproto.Offer),
tasks: make(map[string]*task), tasks: make(map[string]*task),
@ -25,19 +25,19 @@ func newSlave(sid string, e *cluster.Engine) *slave {
} }
} }
func (s *slave) addOffer(offer *mesosproto.Offer) { func (s *agent) addOffer(offer *mesosproto.Offer) {
s.Lock() s.Lock()
s.offers[offer.Id.GetValue()] = offer s.offers[offer.Id.GetValue()] = offer
s.Unlock() s.Unlock()
} }
func (s *slave) addTask(task *task) { func (s *agent) addTask(task *task) {
s.Lock() s.Lock()
s.tasks[task.TaskInfo.TaskId.GetValue()] = task s.tasks[task.TaskInfo.TaskId.GetValue()] = task
s.Unlock() s.Unlock()
} }
func (s *slave) removeOffer(offerID string) bool { func (s *agent) removeOffer(offerID string) bool {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
found := false found := false
@ -48,7 +48,7 @@ func (s *slave) removeOffer(offerID string) bool {
return found return found
} }
func (s *slave) removeTask(taskID string) bool { func (s *agent) removeTask(taskID string) bool {
s.Lock() s.Lock()
defer s.Unlock() defer s.Unlock()
found := false found := false
@ -59,19 +59,19 @@ func (s *slave) removeTask(taskID string) bool {
return found return found
} }
func (s *slave) empty() bool { func (s *agent) empty() bool {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
return len(s.offers) == 0 && len(s.tasks) == 0 return len(s.offers) == 0 && len(s.tasks) == 0
} }
func (s *slave) getOffers() map[string]*mesosproto.Offer { func (s *agent) getOffers() map[string]*mesosproto.Offer {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
return s.offers return s.offers
} }
func (s *slave) getTasks() map[string]*task { func (s *agent) getTasks() map[string]*task {
s.RLock() s.RLock()
defer s.RUnlock() defer s.RUnlock()
return s.tasks return s.tasks

View File

@ -9,8 +9,8 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func TestNewSlave(t *testing.T) { func TestNewAgent(t *testing.T) {
s := newSlave("SID", nil) s := newAgent("SID", nil)
assert.Equal(t, s.id, "SID") assert.Equal(t, s.id, "SID")
assert.Empty(t, s.offers) assert.Empty(t, s.offers)
@ -19,7 +19,7 @@ func TestNewSlave(t *testing.T) {
} }
func TestAddOffer(t *testing.T) { func TestAddOffer(t *testing.T) {
s := newSlave("SID", nil) s := newAgent("SID", nil)
assert.Empty(t, s.offers) assert.Empty(t, s.offers)
assert.True(t, s.empty()) assert.True(t, s.empty())
@ -36,7 +36,7 @@ func TestAddOffer(t *testing.T) {
} }
func TestAddTask(t *testing.T) { func TestAddTask(t *testing.T) {
s := newSlave("SID", nil) s := newAgent("SID", nil)
assert.Empty(t, s.tasks) assert.Empty(t, s.tasks)
assert.True(t, s.empty()) assert.True(t, s.empty())
@ -58,7 +58,7 @@ func TestAddTask(t *testing.T) {
} }
func TestRemoveOffer(t *testing.T) { func TestRemoveOffer(t *testing.T) {
s := newSlave("SID", nil) s := newAgent("SID", nil)
assert.Empty(t, s.offers) assert.Empty(t, s.offers)
@ -75,7 +75,7 @@ func TestRemoveOffer(t *testing.T) {
} }
func TestRemoveTask(t *testing.T) { func TestRemoveTask(t *testing.T) {
s := newSlave("SID", nil) s := newAgent("SID", nil)
assert.Empty(t, s.tasks) assert.Empty(t, s.tasks)

View File

@ -31,7 +31,7 @@ type Cluster struct {
dockerEnginePort string dockerEnginePort string
eventHandler cluster.EventHandler eventHandler cluster.EventHandler
master string master string
slaves map[string]*slave agents map[string]*agent
scheduler *scheduler.Scheduler scheduler *scheduler.Scheduler
TLSConfig *tls.Config TLSConfig *tls.Config
options *cluster.DriverOpts options *cluster.DriverOpts
@ -66,7 +66,7 @@ func NewCluster(scheduler *scheduler.Scheduler, TLSConfig *tls.Config, master st
cluster := &Cluster{ cluster := &Cluster{
dockerEnginePort: defaultDockerEnginePort, dockerEnginePort: defaultDockerEnginePort,
master: master, master: master,
slaves: make(map[string]*slave), agents: make(map[string]*agent),
scheduler: scheduler, scheduler: scheduler,
TLSConfig: TLSConfig, TLSConfig: TLSConfig,
options: &options, options: &options,
@ -201,7 +201,7 @@ func (c *Cluster) Images() cluster.Images {
defer c.RUnlock() defer c.RUnlock()
out := []*cluster.Image{} out := []*cluster.Image{}
for _, s := range c.slaves { for _, s := range c.agents {
out = append(out, s.engine.Images()...) out = append(out, s.engine.Images()...)
} }
return out return out
@ -217,7 +217,7 @@ func (c *Cluster) Image(IDOrName string) *cluster.Image {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
for _, s := range c.slaves { for _, s := range c.agents {
if image := s.engine.Image(IDOrName); image != nil { if image := s.engine.Image(IDOrName); image != nil {
return image return image
} }
@ -266,7 +266,7 @@ func (c *Cluster) Containers() cluster.Containers {
defer c.RUnlock() defer c.RUnlock()
out := cluster.Containers{} out := cluster.Containers{}
for _, s := range c.slaves { for _, s := range c.agents {
for _, container := range s.engine.Containers() { for _, container := range s.engine.Containers() {
if container.Config.Labels != nil { if container.Config.Labels != nil {
if _, ok := container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"]; ok { if _, ok := container.Config.Labels[cluster.SwarmLabelNamespace+".mesos.task"]; ok {
@ -341,7 +341,7 @@ func (c *Cluster) listNodes() []*node.Node {
defer c.RUnlock() defer c.RUnlock()
out := []*node.Node{} out := []*node.Node{}
for _, s := range c.slaves { for _, s := range c.agents {
n := node.NewNode(s.engine) n := node.NewNode(s.engine)
n.ID = s.id n.ID = s.id
n.TotalCpus = int64(sumScalarResourceValue(s.offers, "cpus")) n.TotalCpus = int64(sumScalarResourceValue(s.offers, "cpus"))
@ -358,7 +358,7 @@ func (c *Cluster) listOffers() []*mesosproto.Offer {
defer c.RUnlock() defer c.RUnlock()
list := []*mesosproto.Offer{} list := []*mesosproto.Offer{}
for _, s := range c.slaves { for _, s := range c.agents {
for _, offer := range s.offers { for _, offer := range s.offers {
list = append(list, offer) list = append(list, offer)
} }
@ -371,7 +371,7 @@ func (c *Cluster) TotalMemory() int64 {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
var totalMemory int64 var totalMemory int64
for _, s := range c.slaves { for _, s := range c.agents {
totalMemory += int64(sumScalarResourceValue(s.offers, "mem")) * 1024 * 1024 totalMemory += int64(sumScalarResourceValue(s.offers, "mem")) * 1024 * 1024
} }
return totalMemory return totalMemory
@ -382,7 +382,7 @@ func (c *Cluster) TotalCpus() int64 {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()
var totalCpus int64 var totalCpus int64
for _, s := range c.slaves { for _, s := range c.agents {
totalCpus += int64(sumScalarResourceValue(s.offers, "cpus")) totalCpus += int64(sumScalarResourceValue(s.offers, "cpus"))
} }
return totalCpus return totalCpus
@ -410,7 +410,7 @@ func (c *Cluster) Info() [][]string {
} }
func (c *Cluster) addOffer(offer *mesosproto.Offer) { func (c *Cluster) addOffer(offer *mesosproto.Offer) {
s, ok := c.slaves[offer.SlaveId.GetValue()] s, ok := c.agents[offer.SlaveId.GetValue()]
if !ok { if !ok {
return return
} }
@ -430,14 +430,14 @@ func (c *Cluster) addOffer(offer *mesosproto.Offer) {
func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool { func (c *Cluster) removeOffer(offer *mesosproto.Offer) bool {
log.WithFields(log.Fields{"name": "mesos", "offerID": offer.Id.String()}).Debug("Removing offer") log.WithFields(log.Fields{"name": "mesos", "offerID": offer.Id.String()}).Debug("Removing offer")
s, ok := c.slaves[offer.SlaveId.GetValue()] s, ok := c.agents[offer.SlaveId.GetValue()]
if !ok { if !ok {
return false return false
} }
found := s.removeOffer(offer.Id.GetValue()) found := s.removeOffer(offer.Id.GetValue())
if s.empty() { if s.empty() {
// Disconnect from engine // Disconnect from engine
delete(c.slaves, offer.SlaveId.GetValue()) delete(c.agents, offer.SlaveId.GetValue())
} }
return found return found
} }
@ -451,22 +451,22 @@ func (c *Cluster) scheduleTask(t *task) bool {
return false return false
} }
n := nodes[0] n := nodes[0]
s, ok := c.slaves[n.ID] s, ok := c.agents[n.ID]
if !ok { if !ok {
t.error <- fmt.Errorf("Unable to create on slave %q", n.ID) t.error <- fmt.Errorf("Unable to create on agent %q", n.ID)
return true return true
} }
// build the offer from it's internal config and set the slaveID // build the offer from it's internal config and set the agentID
c.Lock() c.Lock()
// TODO: Only use the offer we need // TODO: Only use the offer we need
offerIDs := []*mesosproto.OfferID{} offerIDs := []*mesosproto.OfferID{}
for _, offer := range c.slaves[n.ID].offers { for _, offer := range c.agents[n.ID].offers {
offerIDs = append(offerIDs, offer.Id) offerIDs = append(offerIDs, offer.Id)
} }
t.build(n.ID, c.slaves[n.ID].offers) t.build(n.ID, c.agents[n.ID].offers)
if _, err := c.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, &mesosproto.Filters{}); err != nil { if _, err := c.driver.LaunchTasks(offerIDs, []*mesosproto.TaskInfo{&t.TaskInfo}, &mesosproto.Filters{}); err != nil {
// TODO: Do not erase all the offers, only the one used // TODO: Do not erase all the offers, only the one used
@ -557,7 +557,7 @@ func (c *Cluster) RANDOMENGINE() (*cluster.Engine, error) {
return nil, err return nil, err
} }
n := nodes[0] n := nodes[0]
return c.slaves[n.ID].engine, nil return c.agents[n.ID].engine, nil
} }
// BuildImage build an image // BuildImage build an image
@ -576,7 +576,7 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
} }
n := nodes[0] n := nodes[0]
reader, err := c.slaves[n.ID].engine.BuildImage(buildImage) reader, err := c.agents[n.ID].engine.BuildImage(buildImage)
if err != nil { if err != nil {
return err return err
} }
@ -585,7 +585,7 @@ func (c *Cluster) BuildImage(buildImage *dockerclient.BuildImage, out io.Writer)
return err return err
} }
c.slaves[n.ID].engine.RefreshImages() c.agents[n.ID].engine.RefreshImages()
return nil return nil
} }

View File

@ -9,7 +9,7 @@ import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
) )
func createSlave(t *testing.T, ID string, containers ...*cluster.Container) *slave { func createAgent(t *testing.T, ID string, containers ...*cluster.Container) *agent {
engOpts := &cluster.EngineOpts{ engOpts := &cluster.EngineOpts{
RefreshMinInterval: time.Duration(30) * time.Second, RefreshMinInterval: time.Duration(30) * time.Second,
RefreshMaxInterval: time.Duration(60) * time.Second, RefreshMaxInterval: time.Duration(60) * time.Second,
@ -24,12 +24,12 @@ func createSlave(t *testing.T, ID string, containers ...*cluster.Container) *sla
engine.AddContainer(container) engine.AddContainer(container)
} }
return newSlave("slave-"+ID, engine) return newAgent("agent-"+ID, engine)
} }
func TestContainerLookup(t *testing.T) { func TestContainerLookup(t *testing.T) {
c := &Cluster{ c := &Cluster{
slaves: make(map[string]*slave), agents: make(map[string]*agent),
} }
container1 := &cluster.Container{ container1 := &cluster.Container{
Container: dockerclient.Container{ Container: dockerclient.Container{
@ -65,8 +65,8 @@ func TestContainerLookup(t *testing.T) {
Config: cluster.BuildContainerConfig(dockerclient.ContainerConfig{}), Config: cluster.BuildContainerConfig(dockerclient.ContainerConfig{}),
} }
s := createSlave(t, "test-engine", container1, container2, container3) s := createAgent(t, "test-engine", container1, container2, container3)
c.slaves[s.id] = s c.agents[s.id] = s
// Hide container without `com.docker.swarm.mesos.task` // Hide container without `com.docker.swarm.mesos.task`
assert.Equal(t, len(c.Containers()), 2) assert.Equal(t, len(c.Containers()), 2)

View File

@ -29,7 +29,7 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes
log.WithFields(log.Fields{"name": "mesos", "offers": len(offers)}).Debug("Offers received") log.WithFields(log.Fields{"name": "mesos", "offers": len(offers)}).Debug("Offers received")
for _, offer := range offers { for _, offer := range offers {
slaveID := offer.SlaveId.GetValue() agentID := offer.SlaveId.GetValue()
dockerPort := c.dockerEnginePort dockerPort := c.dockerEnginePort
for _, attribute := range offer.GetAttributes() { for _, attribute := range offer.GetAttributes() {
if attribute.GetName() == dockerPortAttribute { if attribute.GetName() == dockerPortAttribute {
@ -41,14 +41,14 @@ func (c *Cluster) ResourceOffers(_ mesosscheduler.SchedulerDriver, offers []*mes
} }
} }
} }
s, ok := c.slaves[slaveID] s, ok := c.agents[agentID]
if !ok { if !ok {
engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0, c.engineOpts) engine := cluster.NewEngine(*offer.Hostname+":"+dockerPort, 0, c.engineOpts)
if err := engine.Connect(c.TLSConfig); err != nil { if err := engine.Connect(c.TLSConfig); err != nil {
log.Error(err) log.Error(err)
} else { } else {
s = newSlave(slaveID, engine) s = newAgent(agentID, engine)
c.slaves[slaveID] = s c.agents[agentID] = s
if err := s.engine.RegisterEventHandler(c); err != nil { if err := s.engine.RegisterEventHandler(c); err != nil {
log.Error(err) log.Error(err)
} }
@ -67,8 +67,8 @@ func (c *Cluster) OfferRescinded(mesosscheduler.SchedulerDriver, *mesosproto.Off
func (c *Cluster) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mesosproto.TaskStatus) { func (c *Cluster) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mesosproto.TaskStatus) {
log.WithFields(log.Fields{"name": "mesos", "state": taskStatus.State.String()}).Debug("Status update") log.WithFields(log.Fields{"name": "mesos", "state": taskStatus.State.String()}).Debug("Status update")
taskID := taskStatus.TaskId.GetValue() taskID := taskStatus.TaskId.GetValue()
slaveID := taskStatus.SlaveId.GetValue() agentID := taskStatus.SlaveId.GetValue()
s, ok := c.slaves[slaveID] s, ok := c.agents[agentID]
if !ok { if !ok {
return return
} }
@ -83,9 +83,9 @@ func (c *Cluster) StatusUpdate(_ mesosscheduler.SchedulerDriver, taskStatus *mes
log.WithFields(log.Fields{ log.WithFields(log.Fields{
"name": "mesos", "name": "mesos",
"state": taskStatus.State.String(), "state": taskStatus.State.String(),
"slaveId": taskStatus.SlaveId.GetValue(), "agentId": taskStatus.SlaveId.GetValue(),
"reason": reason, "reason": reason,
}).Warn("Status update received for unknown slave") }).Warn("Status update received for unknown agent")
} }
} }