diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 7fac7b37dc..b06bf49444 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -33,38 +33,38 @@ }, { "ImportPath": "github.com/docker/docker/pkg/discovery", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/docker/pkg/ioutils", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/docker/pkg/longpath", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/docker/pkg/parsers/kernel", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/docker/pkg/random", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/docker/pkg/stringid", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/docker/pkg/version", - "Comment": "v1.4.1-9107-gf11b6a2", - "Rev": "f11b6a2ab313a03d051dd6f69d264d0482df72d6" + "Comment": "v1.4.1-10581-gb65fd8e", + "Rev": "b65fd8e879545e8c9b859ea9b6b825ac50c79e46" }, { "ImportPath": "github.com/docker/engine-api/types", diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/backends.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/backends.go index 875a26c442..65364c9ae8 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/backends.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/backends.go @@ -12,12 +12,8 @@ import ( var ( // Backends is a global map of discovery backends indexed by their // associated scheme. - backends map[string]Backend -) - -func init() { backends = make(map[string]Backend) -} +) // Register makes a discovery backend available by the provided scheme. // If Register is called twice with the same scheme an error is returned. @@ -42,7 +38,7 @@ func parse(rawurl string) (string, string) { // ParseAdvertise parses the --cluster-advertise daemon config which accepts // : or : -func ParseAdvertise(store, advertise string) (string, error) { +func ParseAdvertise(advertise string) (string, error) { var ( iface *net.Interface addrs []net.Addr @@ -93,7 +89,7 @@ func ParseAdvertise(store, advertise string) (string, error) { return "", fmt.Errorf("couldnt find a valid ip-address in interface %s", advertise) } - addr = fmt.Sprintf("%s:%s", addr, port) + addr = net.JoinHostPort(addr, port) return addr, nil } diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/entry.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/entry.go index e9cee26ee1..ce23bbf89b 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/entry.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/entry.go @@ -1,9 +1,6 @@ package discovery -import ( - "fmt" - "net" -) +import "net" // NewEntry creates a new entry. func NewEntry(url string) (*Entry, error) { @@ -27,7 +24,7 @@ func (e *Entry) Equals(cmp *Entry) bool { // String returns the string form of an entry. func (e *Entry) String() string { - return fmt.Sprintf("%s:%s", e.Host, e.Port) + return net.JoinHostPort(e.Host, e.Port) } // Entries is a list of *Entry with some helpers. diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/memory/memory.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/memory/memory.go new file mode 100644 index 0000000000..777a9a16a4 --- /dev/null +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/discovery/memory/memory.go @@ -0,0 +1,83 @@ +package memory + +import ( + "time" + + "github.com/docker/docker/pkg/discovery" +) + +// Discovery implements a descovery backend that keeps +// data in memory. +type Discovery struct { + heartbeat time.Duration + values []string +} + +func init() { + Init() +} + +// Init registers the memory backend on demand. +func Init() { + discovery.Register("memory", &Discovery{}) +} + +// Initialize sets the heartbeat for the memory backend. +func (s *Discovery) Initialize(_ string, heartbeat time.Duration, _ time.Duration, _ map[string]string) error { + s.heartbeat = heartbeat + s.values = make([]string, 0) + return nil +} + +// Watch sends periodic discovery updates to a channel. +func (s *Discovery) Watch(stopCh <-chan struct{}) (<-chan discovery.Entries, <-chan error) { + ch := make(chan discovery.Entries) + errCh := make(chan error) + ticker := time.NewTicker(s.heartbeat) + + go func() { + defer close(errCh) + defer close(ch) + + // Send the initial entries if available. + var currentEntries discovery.Entries + if len(s.values) > 0 { + var err error + currentEntries, err = discovery.CreateEntries(s.values) + if err != nil { + errCh <- err + } else { + ch <- currentEntries + } + } + + // Periodically send updates. + for { + select { + case <-ticker.C: + newEntries, err := discovery.CreateEntries(s.values) + if err != nil { + errCh <- err + continue + } + + // Check if the file has really changed. + if !newEntries.Equals(currentEntries) { + ch <- newEntries + } + currentEntries = newEntries + case <-stopCh: + ticker.Stop() + return + } + } + }() + + return ch, errCh +} + +// Register adds a new address to the discovery. +func (s *Discovery) Register(addr string) error { + s.values = append(s.values, addr) + return nil +} diff --git a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writeflusher.go b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writeflusher.go index 2b35a26662..52a4901ade 100644 --- a/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writeflusher.go +++ b/Godeps/_workspace/src/github.com/docker/docker/pkg/ioutils/writeflusher.go @@ -1,9 +1,7 @@ package ioutils import ( - "errors" "io" - "net/http" "sync" ) @@ -11,45 +9,43 @@ import ( // is a flush. In addition, the Close method can be called to intercept // Read/Write calls if the targets lifecycle has already ended. type WriteFlusher struct { - mu sync.Mutex - w io.Writer - flusher http.Flusher - flushed bool - closed error - - // TODO(stevvooe): Use channel for closed instead, remove mutex. Using a - // channel will allow one to properly order the operations. + w io.Writer + flusher flusher + flushed chan struct{} + flushedOnce sync.Once + closed chan struct{} + closeLock sync.Mutex } -var errWriteFlusherClosed = errors.New("writeflusher: closed") +type flusher interface { + Flush() +} + +var errWriteFlusherClosed = io.EOF func (wf *WriteFlusher) Write(b []byte) (n int, err error) { - wf.mu.Lock() - defer wf.mu.Unlock() - if wf.closed != nil { - return 0, wf.closed + select { + case <-wf.closed: + return 0, errWriteFlusherClosed + default: } n, err = wf.w.Write(b) - wf.flush() // every write is a flush. + wf.Flush() // every write is a flush. return n, err } // Flush the stream immediately. func (wf *WriteFlusher) Flush() { - wf.mu.Lock() - defer wf.mu.Unlock() - - wf.flush() -} - -// flush the stream immediately without taking a lock. Used internally. -func (wf *WriteFlusher) flush() { - if wf.closed != nil { + select { + case <-wf.closed: return + default: } - wf.flushed = true + wf.flushedOnce.Do(func() { + close(wf.flushed) + }) wf.flusher.Flush() } @@ -59,34 +55,38 @@ func (wf *WriteFlusher) Flushed() bool { // BUG(stevvooe): Remove this method. Its use is inherently racy. Seems to // be used to detect whether or a response code has been issued or not. // Another hook should be used instead. - wf.mu.Lock() - defer wf.mu.Unlock() - - return wf.flushed + var flushed bool + select { + case <-wf.flushed: + flushed = true + default: + } + return flushed } // Close closes the write flusher, disallowing any further writes to the // target. After the flusher is closed, all calls to write or flush will // result in an error. func (wf *WriteFlusher) Close() error { - wf.mu.Lock() - defer wf.mu.Unlock() + wf.closeLock.Lock() + defer wf.closeLock.Unlock() - if wf.closed != nil { - return wf.closed + select { + case <-wf.closed: + return errWriteFlusherClosed + default: + close(wf.closed) } - - wf.closed = errWriteFlusherClosed return nil } // NewWriteFlusher returns a new WriteFlusher. func NewWriteFlusher(w io.Writer) *WriteFlusher { - var flusher http.Flusher - if f, ok := w.(http.Flusher); ok { - flusher = f + var fl flusher + if f, ok := w.(flusher); ok { + fl = f } else { - flusher = &NopFlusher{} + fl = &NopFlusher{} } - return &WriteFlusher{w: w, flusher: flusher} + return &WriteFlusher{w: w, flusher: fl, closed: make(chan struct{}), flushed: make(chan struct{})} } diff --git a/cli/join.go b/cli/join.go index df8c64d245..edf4f90802 100644 --- a/cli/join.go +++ b/cli/join.go @@ -2,7 +2,8 @@ package cli import ( "math/rand" - "regexp" + "net" + "strconv" "time" log "github.com/Sirupsen/logrus" @@ -11,8 +12,13 @@ import ( ) func checkAddrFormat(addr string) bool { - m, _ := regexp.MatchString("^[0-9a-zA-Z._-]+:[0-9]{1,5}$", addr) - return m + // validate addr is in host:port form. Use net function to handle both IPv4/IPv6 cases. + _, port, err := net.SplitHostPort(addr) + if err != nil { + return false + } + portNum, err := strconv.Atoi(port) + return err == nil && portNum >= 0 && portNum <= 65535 } func join(c *cli.Context) { diff --git a/cli/join_test.go b/cli/join_test.go index fb093caf62..b7db39a91b 100644 --- a/cli/join_test.go +++ b/cli/join_test.go @@ -24,4 +24,12 @@ func TestCheckAddrFormat(t *testing.T) { assert.True(t, checkAddrFormat("1.1.1.1:1111")) assert.True(t, checkAddrFormat("hostname:1111")) assert.True(t, checkAddrFormat("host-name_42:1111")) + assert.False(t, checkAddrFormat("1.1.1.1:-1")) + assert.True(t, checkAddrFormat("1.1.1.1:65535")) + assert.False(t, checkAddrFormat("1.1.1.1:65536")) + assert.False(t, checkAddrFormat("1.1.1.1: 4000")) + assert.False(t, checkAddrFormat("1.1.1.1:m2")) + assert.True(t, checkAddrFormat("[2001:db8:0:f101::3]:2375")) + assert.False(t, checkAddrFormat("2001:db8:0:f101::3:2375")) + assert.False(t, checkAddrFormat("[2001:db8:0:f101::3]:3:2375")) }