diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 9d582fbcf8..13cbf9eb8b 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -77,31 +77,31 @@ }, { "ImportPath": "github.com/mesos/mesos-go/auth", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/mesos/mesos-go/detector", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/mesos/mesos-go/mesosproto", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/mesos/mesos-go/mesosutil", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/mesos/mesos-go/messenger", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/mesos/mesos-go/scheduler", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/mesos/mesos-go/upid", - "Rev": "d4406379fb263be8b6c2f5d53bac1570aac28242" + "Rev": "83b52d7f648d2a2fa91330123d348d4090be2aed" }, { "ImportPath": "github.com/samalba/dockerclient", diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go index 9fd37b6fb9..2a4d587321 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/auth/sasl/authenticatee_test.go @@ -65,26 +65,27 @@ func TestAuthticatee_validLogin(t *testing.T) { transport.On("Stop").Return(nil) transport.On("Send", mock.Anything, &server, &mesos.AuthenticateMessage{ Pid: proto.String(client.String()), - }).Return(nil).Once() + }).Return(nil).Run(func(_ mock.Arguments) { + transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{ + Mechanisms: []string{crammd5.Name}, + }) + }).Once() transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStartMessage{ Mechanism: proto.String(crammd5.Name), Data: proto.String(""), // may be nil, depends on init step - }).Return(nil).Once() - - transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{ - Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`), - }).Return(nil).Once() - - go func() { - transport.Recv(&server, &mesos.AuthenticationMechanismsMessage{ - Mechanisms: []string{crammd5.Name}, - }) + }).Return(nil).Run(func(_ mock.Arguments) { transport.Recv(&server, &mesos.AuthenticationStepMessage{ Data: []byte(`lsd;lfkgjs;dlfkgjs;dfklg`), }) + }).Once() + + transport.On("Send", mock.Anything, &server, &mesos.AuthenticationStepMessage{ + Data: []byte(`foo cc7fd96cd80123ea844a7dba29a594ed`), + }).Return(nil).Run(func(_ mock.Arguments) { transport.Recv(&server, &mesos.AuthenticationCompletedMessage{}) - }() + }).Once() + return transport }) login, err := makeAuthenticatee(handler, factory) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go index 912dda87b4..2cbe78fdd4 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/factory.go @@ -126,7 +126,11 @@ func CreateMasterInfo(pid *upid.UPID) *mesos.MasterInfo { } //TODO(jdef) what about (future) ipv6 support? var ipv4 net.IP - if addrs, err := net.LookupIP(pid.Host); err == nil { + if ipv4 = net.ParseIP(pid.Host); ipv4 != nil { + // This is needed for the people cross-compiling from macos to linux. + // The cross-compiled version of net.LookupIP() fails to handle plain IPs. + // See https://github.com/mesos/mesos-go/pull/117 + } else if addrs, err := net.LookupIP(pid.Host); err == nil { for _, ip := range addrs { if ip = ip.To4(); ip != nil { ipv4 = ip diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go index 03e0bb7cfc..d9081bbca5 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/interface.go @@ -27,6 +27,21 @@ type MasterChanged interface { OnMasterChanged(*mesos.MasterInfo) } +// AllMasters defines an optional interface that, if implemented by the same +// struct as implements MasterChanged, will receive an additional callbacks +// independently of leadership changes. it's possible that, as a result of a +// leadership change, both the OnMasterChanged and UpdatedMasters callbacks +// would be invoked. +// +// **NOTE:** Detector implementations are not required to support this optional +// interface. Please RTFM of the detector implementation that you want to use. +type AllMasters interface { + // UpdatedMasters is invoked upon a change in the membership of mesos + // masters, and is useful to clients that wish to know the entire set + // of Mesos masters currently running. + UpdatedMasters([]*mesos.MasterInfo) +} + // func/interface adapter type OnMasterChanged func(*mesos.MasterInfo) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client.go index 0bef277a82..3e42c2d7c4 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client.go @@ -2,6 +2,7 @@ package zoo import ( "errors" + "fmt" "sync" "sync/atomic" "time" @@ -26,6 +27,21 @@ const ( connectedState ) +func (s stateType) String() string { + switch s { + case disconnectedState: + return "DISCONNECTED" + case connectionRequestedState: + return "REQUESTED" + case connectionAttemptState: + return "ATTEMPT" + case connectedState: + return "CONNECTED" + default: + panic(fmt.Sprintf("unrecognized state: %d", int32(s))) + } +} + type Client struct { conn Connector defaultFactory Factory @@ -84,8 +100,12 @@ func (zkc *Client) setFactory(f Factory) { } // return true only if the client's state was changed from `from` to `to` -func (zkc *Client) stateChange(from, to stateType) bool { - return atomic.CompareAndSwapInt32((*int32)(&zkc.state), int32(from), int32(to)) +func (zkc *Client) stateChange(from, to stateType) (result bool) { + defer func() { + log.V(3).Infof("stateChange: from=%v to=%v result=%v", from, to, result) + }() + result = atomic.CompareAndSwapInt32((*int32)(&zkc.state), int32(from), int32(to)) + return } // connect to zookeeper, blocks on the initial call to doConnect() @@ -182,13 +202,6 @@ func (zkc *Client) doConnect() error { // - connected ... another goroutine already established a connection // - connectionRequested ... another goroutine is already trying to connect zkc.requestReconnect() - } else { - // let any listeners know about the change - select { - case <-zkc.shouldStop: // noop - case zkc.hasConnected <- struct{}{}: // noop - default: // message buf full, this becomes a non-blocking noop - } } log.Infoln("zookeeper client connected") case <-sessionExpired: @@ -245,13 +258,17 @@ func (zkc *Client) monitorSession(sessionEvents <-chan zk.Event, connected chan log.Infoln("connecting to zookeeper..") case zk.StateConnected: + log.V(2).Infoln("received StateConnected") if firstConnected { - close(connected) // signal listener + close(connected) // signal session listener firstConnected = false } - - case zk.StateSyncConnected: - log.Infoln("syncConnected to zookper server") + // let any listeners know about the change + select { + case <-zkc.shouldStop: // noop + case zkc.hasConnected <- struct{}{}: // noop + default: // message buf full, this becomes a non-blocking noop + } case zk.StateDisconnected: log.Infoln("zookeeper client disconnected") @@ -270,36 +287,56 @@ func (zkc *Client) monitorSession(sessionEvents <-chan zk.Event, connected chan // in the absense of errors a signalling channel is returned that will close // upon the termination of the watch (e.g. due to disconnection). func (zkc *Client) watchChildren(path string, watcher ChildWatcher) (<-chan struct{}, error) { - if !zkc.isConnected() { - return nil, errors.New("Not connected to server.") - } - watchPath := zkc.rootPath if path != "" && path != currentPath { watchPath = watchPath + path } log.V(2).Infoln("Watching children for path", watchPath) - _, _, ch, err := zkc.conn.ChildrenW(watchPath) - if err != nil { - return nil, err - } - watchEnded := make(chan struct{}) go func() { defer close(watchEnded) - zkc._watchChildren(watchPath, ch, watcher) + zkc._watchChildren(watchPath, watcher) }() return watchEnded, nil } -// async continuation of watchChildren. enhances traditional zk watcher functionality by continuously -// renewing child watches as long as the embedded client has not shut down. -func (zkc *Client) _watchChildren(watchPath string, zkevents <-chan zk.Event, watcher ChildWatcher) { +// continuation of watchChildren. blocks until either underlying zk connector terminates, or else this +// client is shut down. continuously renews child watches. +func (zkc *Client) _watchChildren(watchPath string, watcher ChildWatcher) { watcher(zkc, watchPath) // prime the listener + var zkevents <-chan zk.Event var err error -watchLoop: + first := true for { + // we really only expect this to happen when zk session has expired, + // give the connection a little time to re-establish itself + for { + //TODO(jdef) it would be better if we could listen for broadcast Connection/Disconnection events, + //emitted whenever the embedded client cycles (read: when the connection state of this client changes). + //As it currently stands, if the embedded client cycles fast enough, we may actually not notice it here + //and keep on watching like nothing bad happened. + if !zkc.isConnected() { + log.Warningf("no longer connected to server, exiting child watch") + return + } + if first { + first = false + } else { + select { + case <-zkc.shouldStop: + return + case <-time.After(zkc.rewatchDelay): + } + } + _, _, zkevents, err = zkc.conn.ChildrenW(watchPath) + if err == nil { + log.V(2).Infoln("rewatching children for path", watchPath) + break + } + log.V(1).Infof("unable to watch children for path %s: %s", watchPath, err.Error()) + zkc.errorHandler(zkc, err) + } // zkevents is (at most) a one-trick channel // (a) a child event happens (no error) // (b) the embedded client is shutting down (zk.ErrClosing) @@ -333,30 +370,6 @@ watchLoop: } } } - // we really only expect this to happen when zk session has expired, - // give the connection a little time to re-establish itself - for { - //TODO(jdef) it would be better if we could listen for broadcast Connection/Disconnection events, - //emitted whenever the embedded client cycles (read: when the connection state of this client changes). - //As it currently stands, if the embedded client cycles fast enough, we may actually not notice it here - //and keep on watching like nothing bad happened. - if !zkc.isConnected() { - log.Warningf("no longer connected to server, exiting child watch") - return - } - select { - case <-zkc.shouldStop: - return - case <-time.After(zkc.rewatchDelay): - } - _, _, zkevents, err = zkc.conn.ChildrenW(watchPath) - if err == nil { - log.V(2).Infoln("rewatching children for path", watchPath) - continue watchLoop - } - log.V(1).Infof("unable to watch children for path %s: %s", watchPath, err.Error()) - zkc.errorHandler(zkc, err) - } } } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go index 74036636ed..1ebca734b3 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/client_test.go @@ -5,6 +5,7 @@ import ( "fmt" "os" "strings" + "sync/atomic" "testing" "time" @@ -81,21 +82,23 @@ func TestClient_FlappingConnection(t *testing.T) { t.Fatalf("only one connector instance is expected") } attempts++ - ch0 <- zk.Event{ - Type: zk.EventSession, - State: zk.StateConnecting, - Path: test_zk_path, - } - ch0 <- zk.Event{ - Type: zk.EventSession, - State: zk.StateConnected, - Path: test_zk_path, - } - time.Sleep(200 * time.Millisecond) - ch0 <- zk.Event{ - Type: zk.EventSession, - State: zk.StateDisconnected, - Path: test_zk_path, + for i := 0; i < 4; i++ { + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnecting, + Path: test_zk_path, + } + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnected, + Path: test_zk_path, + } + time.Sleep(200 * time.Millisecond) + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateDisconnected, + Path: test_zk_path, + } } }() connector := makeMockConnector(test_zk_path, ch1) @@ -162,11 +165,111 @@ func TestClientWatchErrors(t *testing.T) { select { case <-wCh: case <-time.After(time.Millisecond * 700): - panic("Waited too long...") + t.Fatalf("timed out waiting for error message") } } +func TestWatchChildren_flappy(t *testing.T) { + c, err := newClient(test_zk_hosts, test_zk_path) + c.reconnDelay = 10 * time.Millisecond // we don't want this test to take forever + + assert.NoError(t, err) + + attempts := 0 + conn := NewMockConnector() + defer func() { + if !t.Failed() { + conn.AssertExpectations(t) + } + }() + defer func() { + // stop client and give it time to shut down the connector + c.stop() + time.Sleep(100 * time.Millisecond) + }() + c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) { + log.V(2).Infof("**** Using zk.Conn adapter ****") + ch0 := make(chan zk.Event, 10) // session chan + ch1 := make(chan zk.Event) // watch chan + go func() { + if attempts > 1 { + t.Fatalf("only one connector instance is expected") + } + attempts++ + for i := 0; i < 4; i++ { + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnecting, + Path: test_zk_path, + } + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnected, + Path: test_zk_path, + } + time.Sleep(200 * time.Millisecond) + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateDisconnected, + Path: test_zk_path, + } + } + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnecting, + Path: test_zk_path, + } + ch0 <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnected, + Path: test_zk_path, + } + ch1 <- zk.Event{ + Type: zk.EventNodeChildrenChanged, + Path: test_zk_path, + } + }() + simulatedErr := errors.New("simulated watch error") + conn.On("ChildrenW", test_zk_path).Return(nil, nil, nil, simulatedErr).Times(4) + conn.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(ch1), nil) + conn.On("Close").Return(nil) + return conn, ch0, nil + })) + + go c.connect() + var watchChildrenCount uint64 + watcherFunc := ChildWatcher(func(zkc *Client, path string) { + log.V(1).Infof("ChildWatcher invoked %d", atomic.LoadUint64(&watchChildrenCount)) + }) + startTime := time.Now() + endTime := startTime.Add(2 * time.Second) +watcherLoop: + for time.Now().Before(endTime) { + log.V(1).Infof("entered watcherLoop") + select { + case <-c.connections(): + log.V(1).Infof("invoking watchChildren") + if _, err := c.watchChildren(currentPath, watcherFunc); err == nil { + // watching children succeeded!! + t.Logf("child watch success") + atomic.AddUint64(&watchChildrenCount, 1) + } else { + // setting the watch failed + t.Logf("setting child watch failed: %v", err) + continue watcherLoop + } + case <-c.stopped(): + t.Logf("detected client termination") + break watcherLoop + case <-time.After(endTime.Sub(time.Now())): + } + } + + wantChildrenCount := atomic.LoadUint64(&watchChildrenCount) + assert.Equal(t, uint64(5), wantChildrenCount, "expected watchChildrenCount = 5 instead of %d, should be reinvoked upon initial ChildrenW failures", wantChildrenCount) +} + func makeClient() (*Client, error) { ch0 := make(chan zk.Event, 2) ch1 := make(chan zk.Event, 1) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go index 6ff70254cc..48eee4b414 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect.go @@ -36,7 +36,8 @@ import ( const ( // prefix for nodes listed at the ZK URL path - nodePrefix = "info_" + nodePrefix = "info_" + defaultMinDetectorCyclePeriod = 1 * time.Second ) // reasonable default for a noop change listener @@ -44,10 +45,17 @@ var ignoreChanged = detector.OnMasterChanged(func(*mesos.MasterInfo) {}) // Detector uses ZooKeeper to detect new leading master. type MasterDetector struct { - client *Client - leaderNode string - bootstrap sync.Once // for one-time zk client initiation - ignoreInstalled int32 // only install, at most, one ignoreChanged listener; see MasterDetector.Detect + client *Client + leaderNode string + + // for one-time zk client initiation + bootstrap sync.Once + + // latch: only install, at most, one ignoreChanged listener; see MasterDetector.Detect + ignoreInstalled int32 + + // detection should not signal master change listeners more frequently than this + minDetectorCyclePeriod time.Duration } // Internal constructor function @@ -64,7 +72,8 @@ func NewMasterDetector(zkurls string) (*MasterDetector, error) { } detector := &MasterDetector{ - client: client, + client: client, + minDetectorCyclePeriod: defaultMinDetectorCyclePeriod, } log.V(2).Infoln("Created new detector, watching ", zkHosts, zkPath) @@ -103,8 +112,12 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector return } - topNode := selectTopNode(list) + md.notifyMasterChanged(path, list, obs) + md.notifyAllMasters(path, list, obs) +} +func (md *MasterDetector) notifyMasterChanged(path string, list []string, obs detector.MasterChanged) { + topNode := selectTopNode(list) if md.leaderNode == topNode { log.V(2).Infof("ignoring children-changed event, leader has not changed: %v", path) return @@ -115,21 +128,57 @@ func (md *MasterDetector) childrenChanged(zkc *Client, path string, obs detector var masterInfo *mesos.MasterInfo if md.leaderNode != "" { - data, err := zkc.data(fmt.Sprintf("%s/%s", path, topNode)) - if err != nil { - log.Errorf("unable to retrieve leader data: %v", err.Error()) - return - } - - masterInfo = new(mesos.MasterInfo) - err = proto.Unmarshal(data, masterInfo) - if err != nil { - log.Errorf("unable to unmarshall MasterInfo data from zookeeper: %v", err) - return + var err error + if masterInfo, err = md.pullMasterInfo(path, topNode); err != nil { + log.Errorln(err.Error()) } } log.V(2).Infof("detected master info: %+v", masterInfo) - obs.OnMasterChanged(masterInfo) + logPanic(func() { obs.OnMasterChanged(masterInfo) }) +} + +// logPanic safely executes the given func, recovering from and logging a panic if one occurs. +func logPanic(f func()) { + defer func() { + if r := recover(); r != nil { + log.Errorf("recovered from client panic: %v", r) + } + }() + f() +} + +func (md *MasterDetector) pullMasterInfo(path, node string) (*mesos.MasterInfo, error) { + data, err := md.client.data(fmt.Sprintf("%s/%s", path, node)) + if err != nil { + return nil, fmt.Errorf("failed to retrieve leader data: %v", err) + } + + masterInfo := &mesos.MasterInfo{} + err = proto.Unmarshal(data, masterInfo) + if err != nil { + return nil, fmt.Errorf("failed to unmarshall MasterInfo data from zookeeper: %v", err) + } + return masterInfo, nil +} + +func (md *MasterDetector) notifyAllMasters(path string, list []string, obs detector.MasterChanged) { + all, ok := obs.(detector.AllMasters) + if !ok { + // not interested in entire master list + return + } + masters := []*mesos.MasterInfo{} + for _, node := range list { + info, err := md.pullMasterInfo(path, node) + if err != nil { + log.Errorln(err.Error()) + } else { + masters = append(masters, info) + } + } + + log.V(2).Infof("notifying of master membership change: %+v", masters) + logPanic(func() { all.UpdatedMasters(masters) }) } // the first call to Detect will kickstart a connection to zookeeper. a nil change listener may @@ -154,8 +203,6 @@ func (md *MasterDetector) Detect(f detector.MasterChanged) (err error) { } func (md *MasterDetector) detect(f detector.MasterChanged) { - - minCyclePeriod := 1 * time.Second detectLoop: for { started := time.Now() @@ -190,7 +237,7 @@ detectLoop: select { case <-md.Done(): return - case <-time.After(minCyclePeriod - elapsed): // noop + case <-time.After(md.minDetectorCyclePeriod - elapsed): // noop } } } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go index 758a4e1cf4..de1ce9762a 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/detect_test.go @@ -8,11 +8,13 @@ import ( "testing" "time" + "github.com/gogo/protobuf/proto" log "github.com/golang/glog" "github.com/mesos/mesos-go/detector" mesos "github.com/mesos/mesos-go/mesosproto" "github.com/samuel/go-zookeeper/zk" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" ) const ( @@ -131,7 +133,7 @@ func TestMasterDetectFlappingConnectionState(t *testing.T) { watchEvents := make(chan zk.Event, 10) connector.On("Get", fmt.Sprintf("%s/info_005", test_zk_path)).Return(newTestMasterInfo(1), &zk.Stat{}, nil).Once() - connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil).Once() + connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil) go func() { defer wg.Done() time.Sleep(100 * time.Millisecond) @@ -199,6 +201,13 @@ func TestMasterDetectFlappingConnector(t *testing.T) { connector.On("Close").Return(nil) connector.On("Children", test_zk_path).Return(initialChildren, &zk.Stat{}, nil) + // timing + // t=0 t=400ms t=800ms t=1200ms t=1600ms t=2000ms t=2400ms + // |--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=--|--=--=--=-- + // c1 d1 c3 d3 c5 d5 d6 ... + // c2 d2 c4 d4 c6 c7 ... + // M M' M M' M M' + attempt := 0 c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) { attempt++ @@ -210,7 +219,7 @@ func TestMasterDetectFlappingConnector(t *testing.T) { State: zk.StateConnected, } connector.On("Get", fmt.Sprintf("%s/info_005", test_zk_path)).Return(newTestMasterInfo(attempt), &zk.Stat{}, nil).Once() - connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil).Once() + connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(watchEvents), nil) go func(attempt int) { defer close(sessionEvents) defer close(watchEvents) @@ -231,8 +240,11 @@ func TestMasterDetectFlappingConnector(t *testing.T) { return connector, sessionEvents, nil })) c.reconnDelay = 100 * time.Millisecond + c.rewatchDelay = c.reconnDelay / 2 md, err := NewMasterDetector(zkurl) + md.minDetectorCyclePeriod = 600 * time.Millisecond + defer md.Cancel() assert.NoError(t, err) @@ -242,11 +254,11 @@ func TestMasterDetectFlappingConnector(t *testing.T) { md.client = c var wg sync.WaitGroup - wg.Add(4) // 2 x (connected, disconnected) + wg.Add(6) // 3 x (connected, disconnected) detected := 0 startTime := time.Now() md.Detect(detector.OnMasterChanged(func(master *mesos.MasterInfo) { - if detected > 3 { + if detected > 5 { // ignore return } @@ -314,10 +326,10 @@ func TestMasterDetectMultiple(t *testing.T) { // **** Test 4 consecutive ChildrenChangedEvents ****** // setup event changes sequences := [][]string{ - []string{"info_014", "info_010", "info_005"}, - []string{"info_005", "info_004", "info_022"}, - []string{}, // indicates no master - []string{"info_017", "info_099", "info_200"}, + {"info_014", "info_010", "info_005"}, + {"info_005", "info_004", "info_022"}, + {}, // indicates no master + {"info_017", "info_099", "info_200"}, } var wg sync.WaitGroup @@ -415,3 +427,138 @@ func TestMasterDetect_selectTopNode_mixedEntries(t *testing.T) { node := selectTopNode(nodeList) assert.Equal("info_0000000032", node) } + +// implements MasterChanged and AllMasters extension +type allMastersListener struct { + mock.Mock +} + +func (a *allMastersListener) OnMasterChanged(mi *mesos.MasterInfo) { + a.Called(mi) +} + +func (a *allMastersListener) UpdatedMasters(mi []*mesos.MasterInfo) { + a.Called(mi) +} + +func afterFunc(f func()) <-chan struct{} { + ch := make(chan struct{}) + go func() { + defer close(ch) + f() + }() + return ch +} + +func fatalAfter(t *testing.T, d time.Duration, f func(), msg string, args ...interface{}) { + ch := afterFunc(f) + select { + case <-ch: + return + case <-time.After(d): + t.Fatalf(msg, args...) + } +} + +func TestNotifyAllMasters(t *testing.T) { + c, err := newClient(test_zk_hosts, test_zk_path) + assert.NoError(t, err) + + childEvents := make(chan zk.Event, 5) + connector := NewMockConnector() + + c.setFactory(asFactory(func() (Connector, <-chan zk.Event, error) { + sessionEvents := make(chan zk.Event, 1) + sessionEvents <- zk.Event{ + Type: zk.EventSession, + State: zk.StateConnected, + } + return connector, sessionEvents, nil + })) + + md, err := NewMasterDetector(zkurl) + defer md.Cancel() + + assert.NoError(t, err) + + c.errorHandler = ErrorHandler(func(c *Client, e error) { + t.Fatalf("unexpected error: %v", e) + }) + md.client = c + + listener := &allMastersListener{} + + //-- expect primer + var primer sync.WaitGroup + ignoreArgs := func(f func()) func(mock.Arguments) { + primer.Add(1) + return func(_ mock.Arguments) { + f() + } + } + connector.On("Children", test_zk_path).Return([]string{}, &zk.Stat{}, nil).Run(ignoreArgs(primer.Done)).Once() + listener.On("UpdatedMasters", []*mesos.MasterInfo{}).Return().Run(ignoreArgs(primer.Done)).Once() + connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(primer.Done)).Once() + md.Detect(listener) + fatalAfter(t, 3*time.Second, primer.Wait, "timed out waiting for detection primer") + + listener.AssertExpectations(t) + connector.AssertExpectations(t) + + //-- test membership changes + type expectedGets struct { + info []byte + err error + } + tt := []struct { + zkEntry []string + gets []expectedGets + leaderIdx int + }{ + {[]string{"info_004"}, []expectedGets{{newTestMasterInfo(1), nil}}, 0}, + {[]string{"info_007", "info_005", "info_006"}, []expectedGets{{newTestMasterInfo(2), nil}, {newTestMasterInfo(3), nil}, {newTestMasterInfo(4), nil}}, 1}, + {nil, nil, -1}, + } + for j, tc := range tt { + // expectations + var tcwait sync.WaitGroup + ignoreArgs = func(f func()) func(mock.Arguments) { + tcwait.Add(1) + return func(_ mock.Arguments) { + f() + } + } + + expectedInfos := []*mesos.MasterInfo{} + for i, zke := range tc.zkEntry { + connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, zke)).Return(tc.gets[i].info, &zk.Stat{}, tc.gets[i].err).Run(ignoreArgs(tcwait.Done)).Once() + masterInfo := &mesos.MasterInfo{} + err = proto.Unmarshal(tc.gets[i].info, masterInfo) + if err != nil { + t.Fatalf("failed to unmarshall MasterInfo data: %v", err) + } + expectedInfos = append(expectedInfos, masterInfo) + } + if len(tc.zkEntry) > 0 { + connector.On("Get", fmt.Sprintf("%s/%s", test_zk_path, tc.zkEntry[tc.leaderIdx])).Return( + tc.gets[tc.leaderIdx].info, &zk.Stat{}, tc.gets[tc.leaderIdx].err).Run(ignoreArgs(tcwait.Done)).Once() + } + connector.On("Children", test_zk_path).Return(tc.zkEntry, &zk.Stat{}, nil).Run(ignoreArgs(tcwait.Done)).Once() + listener.On("OnMasterChanged", mock.AnythingOfType("*mesosproto.MasterInfo")).Return().Run(ignoreArgs(tcwait.Done)).Once() + listener.On("UpdatedMasters", expectedInfos).Return().Run(ignoreArgs(tcwait.Done)).Once() + connector.On("ChildrenW", test_zk_path).Return([]string{test_zk_path}, &zk.Stat{}, (<-chan zk.Event)(childEvents), nil).Run(ignoreArgs(tcwait.Done)).Once() + + // fire the event that triggers the test case + childEvents <- zk.Event{ + Type: zk.EventNodeChildrenChanged, + Path: test_zk_path, + } + + // allow plenty of time for all the async processing to happen + fatalAfter(t, 5*time.Second, tcwait.Wait, "timed out waiting for all-masters test case %d", j+1) + listener.AssertExpectations(t) + connector.AssertExpectations(t) + } + + connector.On("Close").Return(nil) +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/doc.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/doc.go new file mode 100644 index 0000000000..010ba055d8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/detector/zoo/doc.go @@ -0,0 +1,3 @@ +// Zookeeper-based mesos-master leaderhip detection. +// Implements support for optional detector.AllMasters interface. +package zoo diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go index 30370b0483..cfd7f2583a 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter.go @@ -21,7 +21,6 @@ package messenger import ( "bytes" "fmt" - "github.com/mesos/mesos-go/upid" "io/ioutil" "net" "net/http" @@ -33,6 +32,7 @@ import ( "time" log "github.com/golang/glog" + "github.com/mesos/mesos-go/upid" "golang.org/x/net/context" ) @@ -235,7 +235,14 @@ func (t *HTTPTransporter) listen() error { } else { host = t.upid.Host } - port := t.upid.Port + + var port string + if t.upid.Port != "" { + port = t.upid.Port + } else { + port = "0" + } + // NOTE: Explicitly specifies IPv4 because Libprocess // only supports IPv4 for now. ln, err := net.Listen("tcp4", net.JoinHostPort(host, port)) @@ -245,7 +252,15 @@ func (t *HTTPTransporter) listen() error { } // Save the host:port in case they are not specified in upid. host, port, _ = net.SplitHostPort(ln.Addr().String()) - t.upid.Host, t.upid.Port = host, port + + if len(t.upid.Host) == 0 { + t.upid.Host = host + } + + if len(t.upid.Port) == 0 || t.upid.Port == "0" { + t.upid.Port = port + } + t.listener = ln return nil } diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go index e1d1409652..f3dc1e56bf 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/http_transporter_test.go @@ -2,6 +2,7 @@ package messenger import ( "fmt" + "net" "net/http" "net/http/httptest" "regexp" @@ -266,6 +267,66 @@ func TestTransporterStartAndStop(t *testing.T) { } } +func TestMutatedHostUPid(t *testing.T) { + serverId := "testserver" + serverPort := getNewPort() + serverHost := "127.0.0.1" + serverAddr := serverHost + ":" + strconv.Itoa(serverPort) + + // override the upid.Host with this listener IP + addr := net.ParseIP("127.0.0.2") + + // setup receiver (server) process + uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr)) + assert.NoError(t, err) + receiver := NewHTTPTransporter(uPid, addr) + + err = receiver.listen() + assert.NoError(t, err) + + if receiver.upid.Host != "127.0.0.1" { + t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host) + } + + if receiver.upid.Port != strconv.Itoa(serverPort) { + t.Fatalf("receiver.upid.Port was expected to return %d, got %s\n", serverPort, receiver.upid.Port) + } +} + +func TestEmptyHostPortUPid(t *testing.T) { + serverId := "testserver" + serverPort := getNewPort() + serverHost := "127.0.0.1" + serverAddr := serverHost + ":" + strconv.Itoa(serverPort) + + // setup receiver (server) process + uPid, err := upid.Parse(fmt.Sprintf("%s@%s", serverId, serverAddr)) + assert.NoError(t, err) + + // Unset upid host and port + uPid.Host = "" + uPid.Port = "" + + // override the upid.Host with this listener IP + addr := net.ParseIP("127.0.0.2") + + receiver := NewHTTPTransporter(uPid, addr) + + err = receiver.listen() + assert.NoError(t, err) + + // This should be the host that overrides as uPid.Host is empty + if receiver.upid.Host != "127.0.0.2" { + t.Fatalf("reciever.upid.Host was expected to return %s, got %s\n", serverHost, receiver.upid.Host) + } + + // This should end up being a random port, not the server port as uPid + // port is empty + if receiver.upid.Port == strconv.Itoa(serverPort) { + t.Fatalf("receiver.upid.Port was not expected to return %d, got %s\n", serverPort, receiver.upid.Port) + } +} + func makeMockServer(path string, handler func(rsp http.ResponseWriter, req *http.Request)) *httptest.Server { mux := http.NewServeMux() mux.HandleFunc(path, handler) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go index 5b242e5bce..c81ad31853 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger.go @@ -76,41 +76,62 @@ type MesosMessenger struct { tr Transporter } -// create a new default messenger (HTTP). If a non-nil, non-wildcard bindingAddress is -// specified then it will be used for both the UPID and Transport binding address. Otherwise -// hostname is resolved to an IP address and the UPID.Host is set to that address and the -// bindingAddress is passed through to the Transport. +// ForHostname creates a new default messenger (HTTP), using UPIDBindingAddress to +// determine the binding-address used for both the UPID.Host and Transport binding address. func ForHostname(proc *process.Process, hostname string, bindingAddress net.IP, port uint16) (Messenger, error) { upid := &upid.UPID{ ID: proc.Label(), Port: strconv.Itoa(int(port)), } + host, err := UPIDBindingAddress(hostname, bindingAddress) + if err != nil { + return nil, err + } + upid.Host = host + return NewHttpWithBindingAddress(upid, bindingAddress), nil +} + +// UPIDBindingAddress determines the value of UPID.Host that will be used to build +// a Transport. If a non-nil, non-wildcard bindingAddress is specified then it will be used +// for both the UPID and Transport binding address. Otherwise hostname is resolved to an IP +// address and the UPID.Host is set to that address and the bindingAddress is passed through +// to the Transport. +func UPIDBindingAddress(hostname string, bindingAddress net.IP) (string, error) { + upidHost := "" if bindingAddress != nil && "0.0.0.0" != bindingAddress.String() { - upid.Host = bindingAddress.String() + upidHost = bindingAddress.String() } else { - ips, err := net.LookupIP(hostname) - if err != nil { - return nil, err + if hostname == "" || hostname == "0.0.0.0" { + return "", fmt.Errorf("invalid hostname (%q) specified with binding address %v", hostname, bindingAddress) } - // try to find an ipv4 and use that - ip := net.IP(nil) - for _, addr := range ips { - if ip = addr.To4(); ip != nil { - break - } + ip := net.ParseIP(hostname) + if ip != nil { + ip = ip.To4() } if ip == nil { - // no ipv4? best guess, just take the first addr - if len(ips) > 0 { - ip = ips[0] - log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip) - } else { - return nil, fmt.Errorf("failed to determine IP address for host '%v'", hostname) + ips, err := net.LookupIP(hostname) + if err != nil { + return "", err + } + // try to find an ipv4 and use that + for _, addr := range ips { + if ip = addr.To4(); ip != nil { + break + } + } + if ip == nil { + // no ipv4? best guess, just take the first addr + if len(ips) > 0 { + ip = ips[0] + log.Warningf("failed to find an IPv4 address for '%v', best guess is '%v'", hostname, ip) + } else { + return "", fmt.Errorf("failed to determine IP address for host '%v'", hostname) + } } } - upid.Host = ip.String() + upidHost = ip.String() } - return NewHttpWithBindingAddress(upid, bindingAddress), nil + return upidHost, nil } // NewMesosMessenger creates a new mesos messenger. diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go index 096f201116..4a18953700 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/messenger/messenger_test.go @@ -3,6 +3,7 @@ package messenger import ( "fmt" "math/rand" + "net" "net/http" "net/http/httptest" "strconv" @@ -431,3 +432,35 @@ func BenchmarkMessengerSendRecvMixedMessage(b *testing.B) { } globalWG.Wait() } + +func TestUPIDBindingAddress(t *testing.T) { + tt := []struct { + hostname string + binding net.IP + expected string + }{ + {"", nil, ""}, + {"", net.IPv4(1, 2, 3, 4), "1.2.3.4"}, + {"", net.IPv4(0, 0, 0, 0), ""}, + {"localhost", nil, "127.0.0.1"}, + {"localhost", net.IPv4(5, 6, 7, 8), "5.6.7.8"}, + {"localhost", net.IPv4(0, 0, 0, 0), "127.0.0.1"}, + {"0.0.0.0", nil, ""}, + {"7.8.9.1", nil, "7.8.9.1"}, + {"7.8.9.1", net.IPv4(0, 0, 0, 0), "7.8.9.1"}, + {"7.8.9.1", net.IPv4(8, 9, 1, 2), "8.9.1.2"}, + } + + for i, tc := range tt { + actual, err := UPIDBindingAddress(tc.hostname, tc.binding) + if err != nil && tc.expected != "" { + t.Fatalf("test case %d failed; expected %q instead of error %v", i+1, tc.expected, err) + } + if err == nil && actual != tc.expected { + t.Fatalf("test case %d failed; expected %q instead of %q", i+1, tc.expected, actual) + } + if err != nil { + t.Logf("test case %d; received expected error %v", i+1, err) + } + } +} diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go index 2b70a27115..90204994af 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler.go @@ -158,6 +158,11 @@ func NewMesosSchedulerDriver(config DriverConfig) (initializedDriver *MesosSched if framework.GetUser() == "" { user, err := user.Current() if err != nil || user == nil { + if err != nil { + log.Warningf("Failed to obtain username: %v\n", err) + } else { + log.Warningln("Failed to obtain username.") + } framework.User = proto.String("") } else { framework.User = proto.String(user.Username) @@ -473,7 +478,7 @@ func (driver *MesosSchedulerDriver) frameworkReregistered(from *upid.UPID, pbMsg } func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg proto.Message) { - log.V(1).Infoln("Handling resource offers.") + log.V(2).Infoln("Handling resource offers.") msg := pbMsg.(*mesos.ResourceOffersMessage) if driver.Status() == mesos.Status_DRIVER_ABORTED { @@ -495,7 +500,7 @@ func (driver *MesosSchedulerDriver) resourcesOffered(from *upid.UPID, pbMsg prot for i, offer := range msg.Offers { if pid, err := upid.Parse(pidStrings[i]); err == nil { driver.cache.putOffer(offer, pid) - log.V(1).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid) + log.V(2).Infof("Cached offer %s from SlavePID %s", offer.Id.GetValue(), pid) } else { log.Warningf("Failed to parse offer PID '%v': %v", pid, err) } @@ -817,11 +822,15 @@ func (driver *MesosSchedulerDriver) Stop(failover bool) (mesos.Status, error) { return stat, fmt.Errorf("Unable to Stop, expected driver status %s, but is %s", mesos.Status_DRIVER_RUNNING, stat) } - if driver.connected && failover { + if driver.connected && !failover { // unregister the framework + log.Infoln("Unregistering the scheduler driver") message := &mesos.UnregisterFrameworkMessage{ FrameworkId: driver.FrameworkInfo.Id, } + //TODO(jdef) this is actually a little racy: we send an 'unregister' message but then + // immediately afterward the messenger is stopped in driver.stop(). so the unregister message + // may not actually end up being sent out. if err := driver.send(driver.MasterPid, message); err != nil { log.Errorf("Failed to send UnregisterFramework message while stopping driver: %v\n", err) return driver.stop(mesos.Status_DRIVER_ABORTED) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_intgr_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_intgr_test.go index 6df1a861ae..fc4137c2b9 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_intgr_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_intgr_test.go @@ -168,9 +168,7 @@ func (suite *SchedulerIntegrationTestSuite) configure(frameworkId *mesos.Framewo suite.sched = newTestScheduler(suite) suite.sched.ch = make(chan bool, 10) // big enough that it doesn't block callback processing - var err error - suite.driver, err = newTestSchedulerDriver(suite.sched, suite.framework, suite.server.Addr, nil) - suite.NoError(err) + suite.driver = newTestSchedulerDriver(suite.T(), suite.sched, suite.framework, suite.server.Addr, nil) suite.config(frameworkId, suite) diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go index bc32a1353a..add643262f 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/scheduler/scheduler_unit_test.go @@ -20,6 +20,11 @@ package scheduler import ( "fmt" + "os/user" + "sync" + "testing" + "time" + "github.com/gogo/protobuf/proto" log "github.com/golang/glog" "github.com/mesos/mesos-go/detector" @@ -32,11 +37,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/suite" - "os" - "os/user" - "sync" - "testing" - "time" + "golang.org/x/net/context" ) var ( @@ -117,24 +118,26 @@ func TestSchedulerSuite(t *testing.T) { suite.Run(t, new(SchedulerTestSuite)) } -func newTestSchedulerDriver(sched Scheduler, framework *mesos.FrameworkInfo, master string, cred *mesos.Credential) (*MesosSchedulerDriver, error) { +func newTestSchedulerDriver(t *testing.T, sched Scheduler, framework *mesos.FrameworkInfo, master string, cred *mesos.Credential) *MesosSchedulerDriver { dconfig := DriverConfig{ Scheduler: sched, Framework: framework, Master: master, Credential: cred, } - return NewMesosSchedulerDriver(dconfig) + driver, err := NewMesosSchedulerDriver(dconfig) + if err != nil { + t.Fatal(err) + } + return driver } func TestSchedulerDriverNew(t *testing.T) { masterAddr := "localhost:5050" - driver, err := newTestSchedulerDriver(NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil) - assert.NotNil(t, driver) - assert.NoError(t, err) + driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil) user, _ := user.Current() assert.Equal(t, user.Username, driver.FrameworkInfo.GetUser()) - host, _ := os.Hostname() + host := util.GetHostname("") assert.Equal(t, host, driver.FrameworkInfo.GetHostname()) } @@ -142,9 +145,7 @@ func TestSchedulerDriverNew_WithPid(t *testing.T) { masterAddr := "master@127.0.0.1:5050" mUpid, err := upid.Parse(masterAddr) assert.NoError(t, err) - driver, err := newTestSchedulerDriver(NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil) - assert.NotNil(t, driver) - assert.NoError(t, err) + driver := newTestSchedulerDriver(t, NewMockScheduler(), &mesos.FrameworkInfo{}, masterAddr, nil) driver.handleMasterChanged(driver.self, &mesos.InternalMasterChangeDetected{Master: &mesos.MasterInfo{Pid: proto.String(mUpid.String())}}) assert.True(t, driver.MasterPid.Equal(mUpid), fmt.Sprintf("expected upid %+v instead of %+v", mUpid, driver.MasterPid)) assert.NoError(t, err) @@ -152,10 +153,7 @@ func TestSchedulerDriverNew_WithPid(t *testing.T) { func (suite *SchedulerTestSuite) TestSchedulerDriverNew_WithZkUrl() { masterAddr := "zk://127.0.0.1:5050/mesos" - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, masterAddr, nil) - suite.NotNil(driver) - suite.NoError(err) - + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, masterAddr, nil) md, err := zoo.NewMockMasterDetector(masterAddr) suite.NoError(err) suite.NotNil(md) @@ -185,8 +183,7 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverNew_WithZkUrl() { func (suite *SchedulerTestSuite) TestSchedulerDriverNew_WithFrameworkInfo_Override() { suite.framework.Hostname = proto.String("local-host") - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, "127.0.0.1:5050", nil) - suite.NoError(err) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, "127.0.0.1:5050", nil) suite.Equal(driver.FrameworkInfo.GetUser(), "test-user") suite.Equal("local-host", driver.FrameworkInfo.GetHostname()) } @@ -200,9 +197,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartOK() { messenger.On("Send").Return(nil) messenger.On("Stop").Return(nil) - driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) stat, err := driver.Start() @@ -219,9 +215,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartWithMessengerFailure() messenger.On("Start").Return(fmt.Errorf("Failed to start messenger")) messenger.On("Stop").Return() - driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) stat, err := driver.Start() @@ -243,10 +238,9 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartWithRegistrationFailure messenger.On("UPID").Return(&upid.UPID{}) messenger.On("Stop").Return(nil) - driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) // reliable registration loops until the driver is stopped, connected, etc.. @@ -268,8 +262,7 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStartWithRegistrationFailure } func (suite *SchedulerTestSuite) TestSchedulerDriverJoinUnstarted() { - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) - suite.NoError(err) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) suite.True(driver.Stopped()) stat, err := driver.Join() @@ -285,9 +278,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverJoinOK() { messenger.On("Send").Return(nil) messenger.On("Stop").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) stat, err := driver.Start() @@ -313,9 +305,8 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverRun() { messenger.On("Send").Return(nil) messenger.On("Stop").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) go func() { @@ -335,8 +326,7 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverRun() { } func (suite *SchedulerTestSuite) TestSchedulerDriverStopUnstarted() { - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) - suite.NoError(err) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) suite.True(driver.Stopped()) stat, err := driver.Stop(true) @@ -345,21 +335,27 @@ func (suite *SchedulerTestSuite) TestSchedulerDriverStopUnstarted() { suite.Equal(mesos.Status_DRIVER_NOT_STARTED, stat) } -func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() { +type msgTracker struct { + *messenger.MockedMessenger + lastMessage proto.Message +} + +func (m *msgTracker) Send(ctx context.Context, upid *upid.UPID, msg proto.Message) error { + m.lastMessage = msg + return m.MockedMessenger.Send(ctx, upid, msg) +} + +func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithoutFailover() { // Set expections and return values. - messenger := messenger.NewMockedMessenger() + messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()} messenger.On("Start").Return(nil) messenger.On("UPID").Return(&upid.UPID{}) messenger.On("Send").Return(nil) messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) - suite.NotNil(driver, "expected valid driver") - suite.NoError(err) - + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) go func() { @@ -371,9 +367,54 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverStopOK() { suite.False(driver.Stopped()) suite.Equal(mesos.Status_DRIVER_RUNNING, driver.Status()) + driver.connected = true // pretend that we're already registered driver.Stop(false) - time.Sleep(time.Millisecond * 1) + + msg := messenger.lastMessage + suite.NotNil(msg) + _, isUnregMsg := msg.(proto.Message) + suite.True(isUnregMsg, "expected UnregisterFrameworkMessage instead of %+v", msg) + + suite.True(driver.Stopped()) + suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status()) +} + +func (suite *SchedulerTestSuite) TestSchdulerDriverStop_WithFailover() { + // Set expections and return values. + messenger := &msgTracker{MockedMessenger: messenger.NewMockedMessenger()} + messenger.On("Start").Return(nil) + messenger.On("UPID").Return(&upid.UPID{}) + messenger.On("Send").Return(nil) + messenger.On("Stop").Return(nil) + messenger.On("Route").Return(nil) + + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) + driver.messenger = messenger + suite.True(driver.Stopped()) + + stat, err := driver.Start() + suite.NoError(err) + suite.Equal(mesos.Status_DRIVER_RUNNING, stat) + suite.False(driver.Stopped()) + driver.connected = true // pretend that we're already registered + + go func() { + // Run() blocks until the driver is stopped or aborted + stat, err := driver.Join() + suite.NoError(err) + suite.Equal(mesos.Status_DRIVER_STOPPED, stat) + }() + + // wait for Join() to begin blocking (so that it has already validated the driver state) + time.Sleep(200 * time.Millisecond) + + driver.Stop(true) // true = scheduler failover + msg := messenger.lastMessage + + // we're expecting that lastMessage is nil because when failing over there's no + // 'unregister' message sent by the scheduler. + suite.Nil(msg) suite.True(driver.Stopped()) suite.Equal(mesos.Status_DRIVER_STOPPED, driver.Status()) @@ -388,12 +429,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverAbort() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) - suite.NotNil(driver, "expected valid driver") - suite.NoError(err) - + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) go func() { @@ -423,13 +460,12 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLunchTasksUnstarted() { messenger := messenger.NewMockedMessenger() messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) stat, err := driver.LaunchTasks( - []*mesos.OfferID{&mesos.OfferID{}}, + []*mesos.OfferID{{}}, []*mesos.TaskInfo{}, &mesos.Filters{}, ) @@ -449,9 +485,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasksWithError() { msgr.On("Stop").Return(nil) msgr.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(sched, suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), sched, suite.framework, suite.master, nil) driver.messenger = msgr - suite.NoError(err) suite.True(driver.Stopped()) go func() { @@ -512,9 +547,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasks() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) go func() { @@ -535,7 +569,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverLaunchTasks() { tasks := []*mesos.TaskInfo{task} stat, err := driver.LaunchTasks( - []*mesos.OfferID{&mesos.OfferID{}}, + []*mesos.OfferID{{}}, tasks, &mesos.Filters{}, ) @@ -551,9 +585,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverKillTask() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) go func() { @@ -577,9 +610,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverRequestResources() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) driver.Start() @@ -588,7 +620,7 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverRequestResources() { stat, err := driver.RequestResources( []*mesos.Request{ - &mesos.Request{ + { SlaveId: util.NewSlaveID("test-slave-001"), Resources: []*mesos.Resource{ util.NewScalarResource("test-res-001", 33.00), @@ -612,9 +644,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverReviveOffers() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) driver.Start() @@ -634,9 +665,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverSendFrameworkMessage() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) driver.Start() @@ -660,9 +690,8 @@ func (suite *SchedulerTestSuite) TestSchdulerDriverReconcileTasks() { messenger.On("Stop").Return(nil) messenger.On("Route").Return(nil) - driver, err := newTestSchedulerDriver(NewMockScheduler(), suite.framework, suite.master, nil) + driver := newTestSchedulerDriver(suite.T(), NewMockScheduler(), suite.framework, suite.master, nil) driver.messenger = messenger - suite.NoError(err) suite.True(driver.Stopped()) driver.Start() diff --git a/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go b/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go index a7470b47ff..99d446bfd3 100644 --- a/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go +++ b/Godeps/_workspace/src/github.com/mesos/mesos-go/upid/upid_test.go @@ -1,21 +1,12 @@ package upid import ( - "math/rand" - "strings" "testing" + "testing/quick" "github.com/stretchr/testify/assert" ) -func generateRandomString() string { - b := make([]byte, rand.Intn(1024)) - for i := range b { - b[i] = byte(rand.Int()) - } - return strings.Replace(string(b), "@", "", -1) -} - func TestUPIDParse(t *testing.T) { u, err := Parse("mesos@foo:bar") assert.Nil(t, u) @@ -29,17 +20,10 @@ func TestUPIDParse(t *testing.T) { assert.Nil(t, u) assert.Error(t, err) - // Simple fuzzy test. - for i := 0; i < 100000; i++ { - ra := generateRandomString() - u, err = Parse(ra) - if u != nil { - println(ra) - } - assert.Nil(t, u) - assert.Error(t, err) - } - + assert.Nil(t, quick.Check(func(s string) bool { + u, err := Parse(s) + return u == nil && err != nil + }, &quick.Config{MaxCount: 100000})) } func TestUPIDString(t *testing.T) {