update mesos-go

Signed-off-by: Victor Vieux <vieux@docker.com>
This commit is contained in:
Victor Vieux 2015-07-07 08:35:47 -07:00
parent 28c62b1fcf
commit d7c11fab16
17 changed files with 718 additions and 235 deletions

14
Godeps/Godeps.json generated
View File

@ -77,31 +77,31 @@
},
{
"ImportPath": "github.com/mesos/mesos-go/auth",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/mesos/mesos-go/detector",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosproto",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/mesos/mesos-go/mesosutil",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/mesos/mesos-go/messenger",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/mesos/mesos-go/scheduler",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/mesos/mesos-go/upid",
"Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242"
"Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed"
},
{
"ImportPath": "github.com/samalba/dockerclient",

View File

@ -65,26 +65,27 @@ func TestAuthticatee_validLogin(t *testing.T) {
transport.On("Stop").Return(nil)
transport.On("Send", mock.Anything, &server, &mesos.AuthenticateMessage{
Pid: proto.String(client.String()),
}).Return(nil).Once()
}).Return(nil).Run(func(_ mock.Arguments) {
transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{
Mechanisms: []string{crammd5.Name},
})
}).Once()
transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStartMessage{
Mechanism: proto.String(crammd5.Name),
Data: proto.String(""), // may be nil, depends on init step
}).Return(nil).Once()
transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{
Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`),
}).Return(nil).Once()
go func() {
transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{
Mechanisms: []string{crammd5.Name},
})
}).Return(nil).Run(func(_ mock.Arguments) {
transport.Recv(&server, &mesos.AuthenticationStepMessage{
Data: []byte(`lsd;lfkgjs;dlfkgjs;dfklg`),
})
}).Once()
transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{
Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`),
}).Return(nil).Run(func(_ mock.Arguments) {
transport.Recv(&server, &mesos.AuthenticationCompletedMessage{})
}()
}).Once()
return transport
})
login, err := makeAuthenticatee(handler, factory)

View File

@ -126,7 +126,11 @@ func CreateMasterInfo(pid *upid.UPID) *mesos.MasterInfo {
}
//TODO(jdef) what about (future) ipv6 support?
var ipv4 net.IP
if addrs, err := net.LookupIP(pid.Host); err == nil {
if ipv4 = net.ParseIP(pid.Host); ipv4 != nil {
// This is needed for the people cross-compiling from macos to linux.
// The cross-compiled version of net.LookupIP() fails to handle plain IPs.
// See https://github.com/mesos/mesos-go/pull/117
} else if addrs, err := net.LookupIP(pid.Host); err == nil {
for _, ip := range addrs {
if ip = ip.To4(); ip != nil {
ipv4 = ip

View File

@ -27,6 +27,21 @@ type MasterChanged interface {
OnMasterChanged(*mesos.MasterInfo)
}
// AllMasters defines an optional interface that, if implemented by the same
// struct as implements MasterChanged, will receive an additional callbacks
// independently of leadership changes. it's possible that, as a result of a
// leadership change, both the OnMasterChanged and UpdatedMasters callbacks
// would be invoked.
//
// **NOTE:** Detector implementations are not required to support this optional
// interface. Please RTFM of the detector implementation that you want to use.
type AllMasters interface {
// UpdatedMasters is invoked upon a change in the membership of mesos
// masters, and is useful to clients that wish to know the entire set
// of Mesos masters currently running.
UpdatedMasters([]*mesos.MasterInfo)
}
// func/interface adapter
type OnMasterChanged func(*mesos.MasterInfo)

View File

@ -2,6 +2,7 @@ package zoo
import (
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
@ -26,6 +27,21 @@ const (
connectedState
)
func (s stateType) String() string {
switch s {
case disconnectedState:
return "DISCONNECTED"
case connectionRequestedState:
return "REQUESTED"
case connectionAttemptState:
return "ATTEMPT"
case connectedState:
return "CONNECTED"
default:
panic(fmt.Sprintf("unrecognized state: %d", int32(s)))
}
}
type Client struct {
conn Connector
defaultFactory Factory
@ -84,8 +100,12 @@ func (zkc *Client) setFactory(f Factory) {
}
// return true only if the client's state was changed from `from` to `to`
func (zkc *Client) stateChange(from, to stateType) bool {
return atomic.CompareAndSwapInt32((*int32)(&zkc.state), int32(from), int32(to))
func (zkc *Client) stateChange(from, to stateType) (result bool) {
defer func() {
log.V(3).Infof("stateChange: from=%v to=%v result=%v", from, to, result)
}()
result = atomic.CompareAndSwapInt32((*int32)(&zkc.state), int32(from), int32(to))
return
}
// connect to zookeeper, blocks on the initial call to doConnect()
@ -182,13 +202,6 @@ func (zkc *Client) doConnect() error {
// - connected ... another goroutine already established a connection
// - connectionRequested ... another goroutine is already trying to connect
zkc.requestReconnect()
} else {
// let any listeners know about the change
select {
case <-zkc.shouldStop: // noop
case zkc.hasConnected <- struct{}{}: // noop
default: // message buf full, this becomes a non-blocking noop
}
}
log.Infoln("zookeeper client connected")
case <-sessionExpired:
@ -245,13 +258,17 @@ func (zkc *Client) monitorSession(sessionEvents <-chan zk.Event, connected chan
log.Infoln("connecting to zookeeper..")
case zk.StateConnected:
log.V(2).Infoln("received StateConnected")
if firstConnected {
close(connected) // signal listener
close(connected) // signal session listener
firstConnected = false
}
case zk.StateSyncConnected:
log.Infoln("syncConnected to zookper server")
// let any listeners know about the change
select {
case <-zkc.shouldStop: // noop
case zkc.hasConnected <- struct{}{}: // noop
default: // message buf full, this becomes a non-blocking noop
}
case zk.StateDisconnected:
log.Infoln("zookeeper client disconnected")
@ -270,36 +287,56 @@ func (zkc *Client) monitorSession(sessionEvents <-chan zk.Event, connected chan
// in the absense of errors a signalling channel is returned that will close
// upon the termination of the watch (e.g. due to disconnection).
func (zkc *Client) watchChildren(path string, watcher ChildWatcher) (<-chan struct{}, error) {
if !zkc.isConnected() {
return nil, errors.New("Not connected to server.")
}
watchPath := zkc.rootPath
if path != "" && path != currentPath {
watchPath = watchPath + path
}
log.V(2).Infoln("Watching children for path", watchPath)
_, _, ch, err := zkc.conn.ChildrenW(watchPath)
if err != nil {
return nil, err
}
watchEnded := make(chan struct{})
go func() {
defer close(watchEnded)
zkc._watchChildren(watchPath, ch, watcher)
zkc._watchChildren(watchPath, watcher)
}()
return watchEnded, nil
}
// async continuation of watchChildren. enhances traditional zk watcher functionality by continuously
// renewing child watches as long as the embedded client has not shut down.
func (zkc *Client) _watchChildren(watchPath string, zkevents <-chan zk.Event, watcher ChildWatcher) {
// continuation of watchChildren. blocks until either underlying zk connector terminates, or else this
// client is shut down. continuously renews child watches.
func (zkc *Client) _watchChildren(watchPath string, watcher ChildWatcher) {
watcher(zkc, watchPath) // prime the listener
var zkevents <-chan zk.Event
var err error
watchLoop:
first := true
for {
// we really only expect this to happen when zk session has expired,
// give the connection a little time to re-establish itself
for {
//TODO(jdef) it would be better if we could listen for broadcast Connection/Disconnection events,
//emitted whenever the embedded client cycles (read: when the connection state of this client changes).
//As it currently stands, if the embedded client cycles fast enough, we may actually not notice it here
//and keep on watching like nothing bad happened.
if !zkc.isConnected() {
log.Warningf("no longer connected to server, exiting child watch")
return
}
if first {
first = false
} else {
select {
case <-zkc.shouldStop:
return
case <-time.After(zkc.rewatchDelay):
}
}
_, _, zkevents, err = zkc.conn.ChildrenW(watchPath)
if err == nil {
log.V(2).Infoln("rewatching children for path", watchPath)
break
}
log.V(1).Infof("unable to watch children for path %s: %s", watchPath, err.Error())
zkc.errorHandler(zkc, err)
}
// zkevents is (at most) a one-trick channel
// (a) a child event happens (no error)
// (b) the embedded client is shutting down (zk.ErrClosing)
@ -333,30 +370,6 @@ watchLoop:
}
}
}
// we really only expect this to happen when zk session has expired,
// give the connection a little time to re-establish itself
for {
//TODO(jdef) it would be better if we could listen for broadcast Connection/Disconnection events,
//emitted whenever the embedded client cycles (read: when the connection state of this client changes).
//As it currently stands, if the embedded client cycles fast enough, we may actually not notice it here
//and keep on watching like nothing bad happened.
if !zkc.isConnected() {
log.Warningf("no longer connected to server, exiting child watch")
return
}
select {
case <-zkc.shouldStop:
return
case <-time.After(zkc.rewatchDelay):
}
_, _, zkevents, err = zkc.conn.ChildrenW(watchPath)
if err == nil {
log.V(2).Infoln("rewatching children for path", watchPath)
continue watchLoop
}
log.V(1).Infof("unable to watch children for path %s: %s", watchPath, err.Error())
zkc.errorHandler(zkc, err)
}
}
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"strings"
"sync/atomic"
"testing"
"time"
@ -81,21 +82,23 @@ func TestClient_FlappingConnection(t *testing.T) {
t.Fatalf("only one connector instance is expected")
}
attempts++
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnecting,
Path: test_zk_path,
}
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnected,
Path: test_zk_path,
}
time.Sleep(200 * time.Millisecond)
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateDisconnected,
Path: test_zk_path,
for i := 0; i < 4; i++ {
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnecting,
Path: test_zk_path,
}
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnected,
Path: test_zk_path,
}
time.Sleep(200 * time.Millisecond)
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateDisconnected,
Path: test_zk_path,
}
}
}()
connector := makeMockConnector(test_zk_path, ch1)
@ -162,11 +165,111 @@ func TestClientWatchErrors(t *testing.T) {
select {
case <-wCh:
case <-time.After(time.Millisecond * 700):
panic("Waited too long...")
t.Fatalf("timed out waiting for error message")
}
}
func TestWatchChildren_flappy(t *testing.T) {
c, err := newClient(test_zk_hosts, test_zk_path)
c.reconnDelay = 10 * time.Millisecond // we don't want this test to take forever
assert.NoError(t, err)
attempts := 0
conn := NewMockConnector()
defer func() {
if !t.Failed() {
conn.AssertExpectations(t)
}
}()
defer func() {
// stop client and give it time to shut down the connector
c.stop()
time.Sleep(100 * time.Millisecond)
}()
c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) {
log.V(2).Infof("**** Using zk.Conn adapter ****")
ch0 := make(chan zk.Event, 10) // session chan
ch1 := make(chan zk.Event) // watch chan
go func() {
if attempts > 1 {
t.Fatalf("only one connector instance is expected")
}
attempts++
for i := 0; i < 4; i++ {
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnecting,
Path: test_zk_path,
}
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnected,
Path: test_zk_path,
}
time.Sleep(200 * time.Millisecond)
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateDisconnected,
Path: test_zk_path,
}
}
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnecting,
Path: test_zk_path,
}
ch0 <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnected,
Path: test_zk_path,
}
ch1 <- zk.Event{
Type: zk.EventNodeChildrenChanged,
Path: test_zk_path,
}
}()
simulatedErr := errors.New("simulated watch error")
conn.On("ChildrenW", test_zk_path).Return(nil, nil, nil, simulatedErr).Times(4)
conn.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(ch1), nil)
conn.On("Close").Return(nil)
return conn, ch0, nil
}))
go c.connect()
var watchChildrenCount uint64
watcherFunc := ChildWatcher(func(zkc *Client, path string) {
log.V(1).Infof("ChildWatcher invoked %d", atomic.LoadUint64(&watchChildrenCount))
})
startTime := time.Now()
endTime := startTime.Add(2 * time.Second)
watcherLoop:
for time.Now().Before(endTime) {
log.V(1).Infof("entered watcherLoop")
select {
case <-c.connections():
log.V(1).Infof("invoking watchChildren")
if _, err := c.watchChildren(currentPath, watcherFunc); err == nil {
// watching children succeeded!!
t.Logf("child watch success")
atomic.AddUint64(&watchChildrenCount, 1)
} else {
// setting the watch failed
t.Logf("setting child watch failed: %v", err)
continue watcherLoop
}
case <-c.stopped():
t.Logf("detected client termination")
break watcherLoop
case <-time.After(endTime.Sub(time.Now())):
}
}
wantChildrenCount := atomic.LoadUint64(&watchChildrenCount)
assert.Equal(t, uint64(5), wantChildrenCount, "expected watchChildrenCount = 5 instead of %d, should be reinvoked upon initial ChildrenW failures", wantChildrenCount)
}
func makeClient() (*Client, error) {
ch0 := make(chan zk.Event, 2)
ch1 := make(chan zk.Event, 1)

View File

@ -36,7 +36,8 @@ import (
const (
// prefix for nodes listed at the ZK URL path
nodePrefix = "info_"
nodePrefix = "info_"
defaultMinDetectorCyclePeriod = 1 * time.Second
)
// reasonable default for a noop change listener
@ -44,10 +45,17 @@ var ignoreChanged = detector.OnMasterChanged(func(*mesos.MasterInfo) {})
// Detector uses ZooKeeper to detect new leading master.
type MasterDetector struct {
client *Client
leaderNode string
bootstrap sync.Once // for one-time zk client initiation
ignoreInstalled int32 // only install, at most, one ignoreChanged listener; see MasterDetector.Detect
client *Client
leaderNode string
// for one-time zk client initiation
bootstrap sync.Once
// latch: only install, at most, one ignoreChanged listener; see MasterDetector.Detect
ignoreInstalled int32
// detection should not signal master change listeners more frequently than this
minDetectorCyclePeriod time.Duration
}
// Internal constructor function
@ -64,7 +72,8 @@ func NewMasterDetector(zkurls string) (*MasterDetector, error) {
}
detector := &MasterDetector{
client: client,
client: client,
minDetectorCyclePeriod: defaultMinDetectorCyclePeriod,
}
log.V(2).Infoln("Created new detector, watching ", zkHosts, zkPath)
@ -103,8 +112,12 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector
return
}
topNode := selectTopNode(list)
md.notifyMasterChanged(path, list, obs)
md.notifyAllMasters(path, list, obs)
}
func (md *MasterDetector) notifyMasterChanged(path string, list []string, obs detector.MasterChanged) {
topNode := selectTopNode(list)
if md.leaderNode == topNode {
log.V(2).Infof("ignoring children-changed event, leader has not changed: %v", path)
return
@ -115,21 +128,57 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector
var masterInfo *mesos.MasterInfo
if md.leaderNode != "" {
data, err := zkc.data(fmt.Sprintf("%s/%s", path, topNode))
if err != nil {
log.Errorf("unable to retrieve leader data: %v", err.Error())
return
}
masterInfo = new(mesos.MasterInfo)
err = proto.Unmarshal(data, masterInfo)
if err != nil {
log.Errorf("unable to unmarshall MasterInfo data from zookeeper: %v", err)
return
var err error
if masterInfo, err = md.pullMasterInfo(path, topNode); err != nil {
log.Errorln(err.Error())
}
}
log.V(2).Infof("detected master info: %+v", masterInfo)
obs.OnMasterChanged(masterInfo)
logPanic(func() { obs.OnMasterChanged(masterInfo) })
}
// logPanic safely executes the given func, recovering from and logging a panic if one occurs.
func logPanic(f func()) {
defer func() {
if r := recover(); r != nil {
log.Errorf("recovered from client panic: %v", r)
}
}()
f()
}
func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) {
data, err := md.client.data(fmt.Sprintf("%s/%s", path, node))
if err != nil {
return nil, fmt.Errorf("failed to retrieve leader data: %v", err)
}
masterInfo := &mesos.MasterInfo{}
err = proto.Unmarshal(data, masterInfo)
if err != nil {
return nil, fmt.Errorf("failed to unmarshall MasterInfo data from zookeeper: %v", err)
}
return masterInfo, nil
}
func (md *MasterDetector) notifyAllMasters(path string, list []string, obs detector.MasterChanged) {
all, ok := obs.(detector.AllMasters)
if !ok {
// not interested in entire master list
return
}
masters := []*mesos.MasterInfo{}
for _, node := range list {
info, err := md.pullMasterInfo(path, node)
if err != nil {
log.Errorln(err.Error())
} else {
masters = append(masters, info)
}
}
log.V(2).Infof("notifying of master membership change: %+v", masters)
logPanic(func() { all.UpdatedMasters(masters) })
}
// the first call to Detect will kickstart a connection to zookeeper. a nil change listener may
@ -154,8 +203,6 @@ func (md *MasterDetector) Detect(f detector.MasterChanged) (err error) {
}
func (md *MasterDetector) detect(f detector.MasterChanged) {
minCyclePeriod := 1 * time.Second
detectLoop:
for {
started := time.Now()
@ -190,7 +237,7 @@ detectLoop:
select {
case <-md.Done():
return
case <-time.After(minCyclePeriod - elapsed): // noop
case <-time.After(md.minDetectorCyclePeriod - elapsed): // noop
}
}
}

View File

@ -8,11 +8,13 @@ import (
"testing"
"time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/samuel/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)
const (
@ -131,7 +133,7 @@ func TestMasterDetectFlappingConnectionState(t *testing.T) {
watchEvents := make(chan zk.Event, 10)
connector.On("Get", fmt.Sprintf("%s/info_005", test_zk_path)).Return(newTestMasterInfo(1), &zk.Stat{}, nil).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil)
go func() {
defer wg.Done()
time.Sleep(100 * time.Millisecond)
@ -199,6 +201,13 @@ func TestMasterDetectFlappingConnector(t *testing.T) {
connector.On("Close").Return(nil)
connector.On("Children", test_zk_path).Return(initialChildren, &zk.Stat{}, nil)
// timing
// t=0 t=400ms t=800ms t=1200ms t=1600ms t=2000ms t=2400ms
// |--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--
// c1 d1 c3 d3 c5 d5 d6 ...
// c2 d2 c4 d4 c6 c7 ...
// M M' M M' M M'
attempt := 0
c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) {
attempt++
@ -210,7 +219,7 @@ func TestMasterDetectFlappingConnector(t *testing.T) {
State: zk.StateConnected,
}
connector.On("Get", fmt.Sprintf("%s/info_005", test_zk_path)).Return(newTestMasterInfo(attempt), &zk.Stat{}, nil).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil)
go func(attempt int) {
defer close(sessionEvents)
defer close(watchEvents)
@ -231,8 +240,11 @@ func TestMasterDetectFlappingConnector(t *testing.T) {
return connector, sessionEvents, nil
}))
c.reconnDelay = 100 * time.Millisecond
c.rewatchDelay = c.reconnDelay / 2
md, err := NewMasterDetector(zkurl)
md.minDetectorCyclePeriod = 600 * time.Millisecond
defer md.Cancel()
assert.NoError(t, err)
@ -242,11 +254,11 @@ func TestMasterDetectFlappingConnector(t *testing.T) {
md.client = c
var wg sync.WaitGroup
wg.Add(4) // 2 x (connected, disconnected)
wg.Add(6) // 3 x (connected, disconnected)
detected := 0
startTime := time.Now()
md.Detect(detector.OnMasterChanged(func(master *mesos.MasterInfo) {
if detected > 3 {
if detected > 5 {
// ignore
return
}
@ -314,10 +326,10 @@ func TestMasterDetectMultiple(t *testing.T) {
// **** Test 4 consecutive ChildrenChangedEvents ******
// setup event changes
sequences := [][]string{
[]string{"info_014", "info_010", "info_005"},
[]string{"info_005", "info_004", "info_022"},
[]string{}, // indicates no master
[]string{"info_017", "info_099", "info_200"},
{"info_014", "info_010", "info_005"},
{"info_005", "info_004", "info_022"},
{}, // indicates no master
{"info_017", "info_099", "info_200"},
}
var wg sync.WaitGroup
@ -415,3 +427,138 @@ func TestMasterDetect_selectTopNode_mixedEntries(t *testing.T) {
node := selectTopNode(nodeList)
assert.Equal("info_0000000032", node)
}
// implements MasterChanged and AllMasters extension
type allMastersListener struct {
mock.Mock
}
func (a *allMastersListener) OnMasterChanged(mi *mesos.MasterInfo) {
a.Called(mi)
}
func (a *allMastersListener) UpdatedMasters(mi []*mesos.MasterInfo) {
a.Called(mi)
}
func afterFunc(f func()) <-chan struct{} {
ch := make(chan struct{})
go func() {
defer close(ch)
f()
}()
return ch
}
func fatalAfter(t *testing.T, d time.Duration, f func(), msg string, args ...interface{}) {
ch := afterFunc(f)
select {
case <-ch:
return
case <-time.After(d):
t.Fatalf(msg, args...)
}
}
func TestNotifyAllMasters(t *testing.T) {
c, err := newClient(test_zk_hosts, test_zk_path)
assert.NoError(t, err)
childEvents := make(chan zk.Event, 5)
connector := NewMockConnector()
c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) {
sessionEvents := make(chan zk.Event, 1)
sessionEvents <- zk.Event{
Type: zk.EventSession,
State: zk.StateConnected,
}
return connector, sessionEvents, nil
}))
md, err := NewMasterDetector(zkurl)
defer md.Cancel()
assert.NoError(t, err)
c.errorHandler = ErrorHandler(func(c *Client, e error) {
t.Fatalf("unexpected error: %v", e)
})
md.client = c
listener := &allMastersListener{}
//-- expect primer
var primer sync.WaitGroup
ignoreArgs := func(f func()) func(mock.Arguments) {
primer.Add(1)
return func(_ mock.Arguments) {
f()
}
}
connector.On("Children", test_zk_path).Return([]string{}, &zk.Stat{}, nil).Run(ignoreArgs(primer.Done)).Once()
listener.On("UpdatedMasters", []*mesos.MasterInfo{}).Return().Run(ignoreArgs(primer.Done)).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(primer.Done)).Once()
md.Detect(listener)
fatalAfter(t, 3*time.Second, primer.Wait, "timed out waiting for detection primer")
listener.AssertExpectations(t)
connector.AssertExpectations(t)
//-- test membership changes
type expectedGets struct {
info []byte
err error
}
tt := []struct {
zkEntry []string
gets []expectedGets
leaderIdx int
}{
{[]string{"info_004"}, []expectedGets{{newTestMasterInfo(1), nil}}, 0},
{[]string{"info_007", "info_005", "info_006"}, []expectedGets{{newTestMasterInfo(2), nil}, {newTestMasterInfo(3), nil}, {newTestMasterInfo(4), nil}}, 1},
{nil, nil, -1},
}
for j, tc := range tt {
// expectations
var tcwait sync.WaitGroup
ignoreArgs = func(f func()) func(mock.Arguments) {
tcwait.Add(1)
return func(_ mock.Arguments) {
f()
}
}
expectedInfos := []*mesos.MasterInfo{}
for i, zke := range tc.zkEntry {
connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, zke)).Return(tc.gets[i].info, &zk.Stat{}, tc.gets[i].err).Run(ignoreArgs(tcwait.Done)).Once()
masterInfo := &mesos.MasterInfo{}
err = proto.Unmarshal(tc.gets[i].info, masterInfo)
if err != nil {
t.Fatalf("failed to unmarshall MasterInfo data: %v", err)
}
expectedInfos = append(expectedInfos, masterInfo)
}
if len(tc.zkEntry) > 0 {
connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, tc.zkEntry[tc.leaderIdx])).Return(
tc.gets[tc.leaderIdx].info, &zk.Stat{}, tc.gets[tc.leaderIdx].err).Run(ignoreArgs(tcwait.Done)).Once()
}
connector.On("Children", test_zk_path).Return(tc.zkEntry, &zk.Stat{}, nil).Run(ignoreArgs(tcwait.Done)).Once()
listener.On("OnMasterChanged", mock.AnythingOfType("*mesosproto.MasterInfo")).Return().Run(ignoreArgs(tcwait.Done)).Once()
listener.On("UpdatedMasters", expectedInfos).Return().Run(ignoreArgs(tcwait.Done)).Once()
connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(tcwait.Done)).Once()
// fire the event that triggers the test case
childEvents <- zk.Event{
Type: zk.EventNodeChildrenChanged,
Path: test_zk_path,
}
// allow plenty of time for all the async processing to happen
fatalAfter(t, 5*time.Second, tcwait.Wait, "timed out waiting for all-masters test case %d", j+1)
listener.AssertExpectations(t)
connector.AssertExpectations(t)
}
connector.On("Close").Return(nil)
}

View File

@ -0,0 +1,3 @@
// Zookeeper-based mesos-master leaderhip detection.
// Implements support for optional detector.AllMasters interface.
package zoo

View File

@ -21,7 +21,6 @@ package messenger
import (
"bytes"
"fmt"
"github.com/mesos/mesos-go/upid"
"io/ioutil"
"net"
"net/http"
@ -33,6 +32,7 @@ import (
"time"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/upid"
"golang.org/x/net/context"
)
@ -235,7 +235,14 @@ func (t *HTTPTransporter) listen() error {
} else {
host = t.upid.Host
}
port := t.upid.Port
var port string
if t.upid.Port != "" {
port = t.upid.Port
} else {
port = "0"
}
// NOTE: Explicitly specifies IPv4 because Libprocess
// only supports IPv4 for now.
ln, err := net.Listen("tcp4", net.JoinHostPort(host, port))
@ -245,7 +252,15 @@ func (t *HTTPTransporter) listen() error {
}
// Save the host:port in case they are not specified in upid.
host, port, _ = net.SplitHostPort(ln.Addr().String())
t.upid.Host, t.upid.Port = host, port
if len(t.upid.Host) == 0 {
t.upid.Host = host
}
if len(t.upid.Port) == 0 || t.upid.Port == "0" {
t.upid.Port = port
}
t.listener = ln
return nil
}

View File

@ -2,6 +2,7 @@ package messenger
import (
"fmt"
"net"
"net/http"
"net/http/httptest"
"regexp"
@ -266,6 +267,66 @@ func TestTransporterStartAndStop(t *testing.T) {
}
}
func TestMutatedHostUPid(t *testing.T) {
serverId := "testserver"
serverPort := getNewPort()
serverHost := "127.0.0.1"
serverAddr := serverHost + ":" + strconv.Itoa(serverPort)
// override the upid.Host with this listener IP
addr := net.ParseIP("127.0.0.2")
// setup receiver (server) process
uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr))
assert.NoError(t, err)
receiver := NewHTTPTransporter(uPid, addr)
err = receiver.listen()
assert.NoError(t, err)
if receiver.upid.Host != "127.0.0.1" {
t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host)
}
if receiver.upid.Port != strconv.Itoa(serverPort) {
t.Fatalf("receiver.upid.Port was expected to return %d, got %s\n", serverPort, receiver.upid.Port)
}
}
func TestEmptyHostPortUPid(t *testing.T) {
serverId := "testserver"
serverPort := getNewPort()
serverHost := "127.0.0.1"
serverAddr := serverHost + ":" + strconv.Itoa(serverPort)
// setup receiver (server) process
uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr))
assert.NoError(t, err)
// Unset upid host and port
uPid.Host = ""
uPid.Port = ""
// override the upid.Host with this listener IP
addr := net.ParseIP("127.0.0.2")
receiver := NewHTTPTransporter(uPid, addr)
err = receiver.listen()
assert.NoError(t, err)
// This should be the host that overrides as uPid.Host is empty
if receiver.upid.Host != "127.0.0.2" {
t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host)
}
// This should end up being a random port, not the server port as uPid
// port is empty
if receiver.upid.Port == strconv.Itoa(serverPort) {
t.Fatalf("receiver.upid.Port was not expected to return %d, got %s\n", serverPort, receiver.upid.Port)
}
}
func makeMockServer(path string, handler func(rsp http.ResponseWriter, req *http.Request)) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc(path, handler)

View File

@ -76,41 +76,62 @@ type MesosMessenger struct {
tr Transporter
}
// create a new default messenger (HTTP). If a non-nil, non-wildcard bindingAddress is
// specified then it will be used for both the UPID and Transport binding address. Otherwise
// hostname is resolved to an IP address and the UPID.Host is set to that address and the
// bindingAddress is passed through to the Transport.
// ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to
// determine the binding-address used for both the UPID.Host and Transport binding address.
func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16) (Messenger, error) {
upid := &upid.UPID{
ID: proc.Label(),
Port: strconv.Itoa(int(port)),
}
host, err := UPIDBindingAddress(hostname, bindingAddress)
if err != nil {
return nil, err
}
upid.Host = host
return NewHttpWithBindingAddress(upid, bindingAddress), nil
}
// UPIDBindingAddress determines the value of UPID.Host that will be used to build
// a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used
// for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP
// address and the UPID.Host is set to that address and the bindingAddress is passed through
// to the Transport.
func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) {
upidHost := ""
if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() {
upid.Host = bindingAddress.String()
upidHost = bindingAddress.String()
} else {
ips, err := net.LookupIP(hostname)
if err != nil {
return nil, err
if hostname == "" || hostname == "0.0.0.0" {
return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress)
}
// try to find an ipv4 and use that
ip := net.IP(nil)
for _, addr := range ips {
if ip = addr.To4(); ip != nil {
break
}
ip := net.ParseIP(hostname)
if ip != nil {
ip = ip.To4()
}
if ip == nil {
// no ipv4? best guess, just take the first addr
if len(ips) > 0 {
ip = ips[0]
log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip)
} else {
return nil, fmt.Errorf("failed to determine IP address for host '%v'", hostname)
ips, err := net.LookupIP(hostname)
if err != nil {
return "", err
}
// try to find an ipv4 and use that
for _, addr := range ips {
if ip = addr.To4(); ip != nil {
break
}
}
if ip == nil {
// no ipv4? best guess, just take the first addr
if len(ips) > 0 {
ip = ips[0]
log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip)
} else {
return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname)
}
}
}
upid.Host = ip.String()
upidHost = ip.String()
}
return NewHttpWithBindingAddress(upid, bindingAddress), nil
return upidHost, nil
}
// NewMesosMessenger creates a new mesos messenger.

View File

@ -3,6 +3,7 @@ package messenger
import (
"fmt"
"math/rand"
"net"
"net/http"
"net/http/httptest"
"strconv"
@ -431,3 +432,35 @@ func BenchmarkMessengerSendRecvMixedMessage(b *testing.B) {
}
globalWG.Wait()
}
func TestUPIDBindingAddress(t *testing.T) {
tt := []struct {
hostname string
binding net.IP
expected string
}{
{"", nil, ""},
{"", net.IPv4(1, 2, 3, 4), "1.2.3.4"},
{"", net.IPv4(0, 0, 0, 0), ""},
{"localhost", nil, "127.0.0.1"},
{"localhost", net.IPv4(5, 6, 7, 8), "5.6.7.8"},
{"localhost", net.IPv4(0, 0, 0, 0), "127.0.0.1"},
{"0.0.0.0", nil, ""},
{"7.8.9.1", nil, "7.8.9.1"},
{"7.8.9.1", net.IPv4(0, 0, 0, 0), "7.8.9.1"},
{"7.8.9.1", net.IPv4(8, 9, 1, 2), "8.9.1.2"},
}
for i, tc := range tt {
actual, err := UPIDBindingAddress(tc.hostname, tc.binding)
if err != nil && tc.expected != "" {
t.Fatalf("test case %d failed; expected %q instead of error %v", i+1, tc.expected, err)
}
if err == nil && actual != tc.expected {
t.Fatalf("test case %d failed; expected %q instead of %q", i+1, tc.expected, actual)
}
if err != nil {
t.Logf("test case %d; received expected error %v", i+1, err)
}
}
}

View File

@ -158,6 +158,11 @@ func NewMesosSchedulerDriver(config DriverConfig) (initializedDriver *MesosSched
if framework.GetUser() == "" {
user, err := user.Current()
if err != nil || user == nil {
if err != nil {
log.Warningf("Failed to obtain username: %v\n", err)
} else {
log.Warningln("Failed to obtain username.")
}
framework.User = proto.String("")
} else {
framework.User = proto.String(user.Username)
@ -473,7 +478,7 @@ func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg
}
func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) {
log.V(1).Infoln("Handling resource offers.")
log.V(2).Infoln("Handling resource offers.")
msg := pbMsg.(*mesos.ResourceOffersMessage)
if driver.Status() == mesos.Status_DRIVER_ABORTED {
@ -495,7 +500,7 @@ func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg prot
for i, offer := range msg.Offers {
if pid, err := upid.Parse(pidStrings[i]); err == nil {
driver.cache.putOffer(offer, pid)
log.V(1).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid)
log.V(2).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid)
} else {
log.Warningf("Failed to parse offer PID '%v': %v", pid, err)
}
@ -817,11 +822,15 @@ func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) {
return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat)
}
if driver.connected && failover {
if driver.connected && !failover {
// unregister the framework
log.Infoln("Unregistering the scheduler driver")
message := &mesos.UnregisterFrameworkMessage{
FrameworkId: driver.FrameworkInfo.Id,
}
//TODO(jdef) this is actually a little racy: we send an 'unregister' message but then
// immediately afterward the messenger is stopped in driver.stop(). so the unregister message
// may not actually end up being sent out.
if err := driver.send(driver.MasterPid, message); err != nil {
log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err)
return driver.stop(mesos.Status_DRIVER_ABORTED)

View File

@ -168,9 +168,7 @@ func (suite *SchedulerIntegrationTestSuite) configure(frameworkId *mesos.Framewo
suite.sched = newTestScheduler(suite)
suite.sched.ch = make(chan bool, 10) // big enough that it doesn't block callback processing
var err error
suite.driver, err = newTestSchedulerDriver(suite.sched, suite.framework, suite.server.Addr, nil)
suite.NoError(err)
suite.driver = newTestSchedulerDriver(suite.T(), suite.sched, suite.framework, suite.server.Addr, nil)
suite.config(frameworkId, suite)

View File

@ -20,6 +20,11 @@ package scheduler
import (
"fmt"
"os/user"
"sync"
"testing"
"time"
"github.com/gogo/protobuf/proto"
log "github.com/golang/glog"
"github.com/mesos/mesos-go/detector"
@ -32,11 +37,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"os"
"os/user"
"sync"
"testing"
"time"
"golang.org/x/net/context"
)
var (
@ -117,24 +118,26 @@ func TestSchedulerSuite(t *testing.T) {
suite.Run(t, new(SchedulerTestSuite))
}
func newTestSchedulerDriver(sched Scheduler, framework *mesos.FrameworkInfo, master string, cred *mesos.Credential) (*MesosSchedulerDriver, error) {
func newTestSchedulerDriver(t *testing.T, sched Scheduler, framework *mesos.FrameworkInfo, master string, cred *mesos.Credential) *MesosSchedulerDriver {
dconfig := DriverConfig{
Scheduler: sched,
Framework: framework,
Master: master,
Credential: cred,
}
return NewMesosSchedulerDriver(dconfig)
driver, err := NewMesosSchedulerDriver(dconfig)
if err != nil {
t.Fatal(err)
}
return driver
}
func TestSchedulerDriverNew(t *testing.T) {
masterAddr := "localhost:5050"
driver, err := newTestSchedulerDriver(NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil)
assert.NotNil(t, driver)
assert.NoError(t, err)
driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil)
user, _ := user.Current()
assert.Equal(t, user.Username, driver.FrameworkInfo.GetUser())
host, _ := os.Hostname()
host := util.GetHostname("")
assert.Equal(t, host, driver.FrameworkInfo.GetHostname())
}
@ -142,9 +145,7 @@ func TestSchedulerDriverNew_WithPid(t *testing.T) {
masterAddr := "master@127.0.0.1:5050"
mUpid, err := upid.Parse(masterAddr)
assert.NoError(t, err)
driver, err := newTestSchedulerDriver(NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil)
assert.NotNil(t, driver)
assert.NoError(t, err)
driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil)
driver.handleMasterChanged(driver.self, &mesos.InternalMasterChangeDetected{Master: &mesos.MasterInfo{Pid: proto.String(mUpid.String())}})
assert.True(t, driver.MasterPid.Equal(mUpid), fmt.Sprintf("expected upid %+v instead of %+v", mUpid, driver.MasterPid))
assert.NoError(t, err)
@ -152,10 +153,7 @@ func TestSchedulerDriverNew_WithPid(t *testing.T) {
func (suite *SchedulerTestSuite) TestSchedulerDriverNew_WithZkUrl() {
masterAddr := "zk://127.0.0.1:5050/mesos"
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, masterAddr, nil)
suite.NotNil(driver)
suite.NoError(err)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, masterAddr, nil)
md, err := zoo.NewMockMasterDetector(masterAddr)
suite.NoError(err)
suite.NotNil(md)
@ -185,8 +183,7 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverNew_WithZkUrl() {
func (suite *SchedulerTestSuite) TestSchedulerDriverNew_WithFrameworkInfo_Override() {
suite.framework.Hostname = proto.String("local-host")
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, "127.0.0.1:5050", nil)
suite.NoError(err)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, "127.0.0.1:5050", nil)
suite.Equal(driver.FrameworkInfo.GetUser(), "test-user")
suite.Equal("local-host", driver.FrameworkInfo.GetHostname())
}
@ -200,9 +197,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartOK() {
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)
driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
stat, err := driver.Start()
@ -219,9 +215,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartWithMessengerFailure()
messenger.On("Start").Return(fmt.Errorf("Failed to start messenger"))
messenger.On("Stop").Return()
driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
stat, err := driver.Start()
@ -243,10 +238,9 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartWithRegistrationFailure
messenger.On("UPID").Return(&upid.UPID{})
messenger.On("Stop").Return(nil)
driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
// reliable registration loops until the driver is stopped, connected, etc..
@ -268,8 +262,7 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartWithRegistrationFailure
}
func (suite *SchedulerTestSuite) TestSchedulerDriverJoinUnstarted() {
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
suite.NoError(err)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
suite.True(driver.Stopped())
stat, err := driver.Join()
@ -285,9 +278,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverJoinOK() {
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
stat, err := driver.Start()
@ -313,9 +305,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverRun() {
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
go func() {
@ -335,8 +326,7 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverRun() {
}
func (suite *SchedulerTestSuite) TestSchedulerDriverStopUnstarted() {
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
suite.NoError(err)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
suite.True(driver.Stopped())
stat, err := driver.Stop(true)
@ -345,21 +335,27 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStopUnstarted() {
suite.Equal(mesos.Status_DRIVER_NOT_STARTED, stat)
}
func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() {
type msgTracker struct {
*messenger.MockedMessenger
lastMessage proto.Message
}
func (m *msgTracker) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error {
m.lastMessage = msg
return m.MockedMessenger.Send(ctx, upid, msg)
}
func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithoutFailover() {
// Set expections and return values.
messenger := messenger.NewMockedMessenger()
messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()}
messenger.On("Start").Return(nil)
messenger.On("UPID").Return(&upid.UPID{})
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
suite.NotNil(driver, "expected valid driver")
suite.NoError(err)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
go func() {
@ -371,9 +367,54 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() {
suite.False(driver.Stopped())
suite.Equal(mesos.Status_DRIVER_RUNNING, driver.Status())
driver.connected = true // pretend that we're already registered
driver.Stop(false)
time.Sleep(time.Millisecond * 1)
msg := messenger.lastMessage
suite.NotNil(msg)
_, isUnregMsg := msg.(proto.Message)
suite.True(isUnregMsg, "expected UnregisterFrameworkMessage instead of %+v", msg)
suite.True(driver.Stopped())
suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status())
}
func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithFailover() {
// Set expections and return values.
messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()}
messenger.On("Start").Return(nil)
messenger.On("UPID").Return(&upid.UPID{})
messenger.On("Send").Return(nil)
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.True(driver.Stopped())
stat, err := driver.Start()
suite.NoError(err)
suite.Equal(mesos.Status_DRIVER_RUNNING, stat)
suite.False(driver.Stopped())
driver.connected = true // pretend that we're already registered
go func() {
// Run() blocks until the driver is stopped or aborted
stat, err := driver.Join()
suite.NoError(err)
suite.Equal(mesos.Status_DRIVER_STOPPED, stat)
}()
// wait for Join() to begin blocking (so that it has already validated the driver state)
time.Sleep(200 * time.Millisecond)
driver.Stop(true) // true = scheduler failover
msg := messenger.lastMessage
// we're expecting that lastMessage is nil because when failing over there's no
// 'unregister' message sent by the scheduler.
suite.Nil(msg)
suite.True(driver.Stopped())
suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status())
@ -388,12 +429,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverAbort() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
suite.NotNil(driver, "expected valid driver")
suite.NoError(err)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
go func() {
@ -423,13 +460,12 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLunchTasksUnstarted() {
messenger := messenger.NewMockedMessenger()
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
stat, err := driver.LaunchTasks(
[]*mesos.OfferID{&mesos.OfferID{}},
[]*mesos.OfferID{{}},
[]*mesos.TaskInfo{},
&mesos.Filters{},
)
@ -449,9 +485,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasksWithError() {
msgr.On("Stop").Return(nil)
msgr.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil)
driver.messenger = msgr
suite.NoError(err)
suite.True(driver.Stopped())
go func() {
@ -512,9 +547,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasks() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
go func() {
@ -535,7 +569,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasks() {
tasks := []*mesos.TaskInfo{task}
stat, err := driver.LaunchTasks(
[]*mesos.OfferID{&mesos.OfferID{}},
[]*mesos.OfferID{{}},
tasks,
&mesos.Filters{},
)
@ -551,9 +585,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverKillTask() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
go func() {
@ -577,9 +610,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverRequestResources() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
driver.Start()
@ -588,7 +620,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverRequestResources() {
stat, err := driver.RequestResources(
[]*mesos.Request{
&mesos.Request{
{
SlaveId: util.NewSlaveID("test-slave-001"),
Resources: []*mesos.Resource{
util.NewScalarResource("test-res-001", 33.00),
@ -612,9 +644,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverReviveOffers() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
driver.Start()
@ -634,9 +665,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverSendFrameworkMessage() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
driver.Start()
@ -660,9 +690,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverReconcileTasks() {
messenger.On("Stop").Return(nil)
messenger.On("Route").Return(nil)
driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil)
driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil)
driver.messenger = messenger
suite.NoError(err)
suite.True(driver.Stopped())
driver.Start()

View File

@ -1,21 +1,12 @@
package upid
import (
"math/rand"
"strings"
"testing"
"testing/quick"
"github.com/stretchr/testify/assert"
)
func generateRandomString() string {
b := make([]byte, rand.Intn(1024))
for i := range b {
b[i] = byte(rand.Int())
}
return strings.Replace(string(b), "@", "", -1)
}
func TestUPIDParse(t *testing.T) {
u, err := Parse("mesos@foo:bar")
assert.Nil(t, u)
@ -29,17 +20,10 @@ func TestUPIDParse(t *testing.T) {
assert.Nil(t, u)
assert.Error(t, err)
// Simple fuzzy test.
for i := 0; i < 100000; i++ {
ra := generateRandomString()
u, err = Parse(ra)
if u != nil {
println(ra)
}
assert.Nil(t, u)
assert.Error(t, err)
}
assert.Nil(t, quick.Check(func(s string) bool {
u, err := Parse(s)
return u == nil && err != nil
}, &quick.Config{MaxCount: 100000}))
}
func TestUPIDString(t *testing.T) {