From cfb7711a74dc4b54d879f79bc2f6435ed71163b9 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Sat, 15 Feb 2014 20:02:54 -0800 Subject: [PATCH 1/5] Add socket activation for go apps Docker-DCO-1.1-Signed-off-by: Michael Crosby (github: crosbymichael) --- pkg/socketactivation/activation.go | 61 ++++++++++++++++++++++++++++++ 1 file changed, 61 insertions(+) create mode 100644 pkg/socketactivation/activation.go diff --git a/pkg/socketactivation/activation.go b/pkg/socketactivation/activation.go new file mode 100644 index 0000000000..0edbcaac23 --- /dev/null +++ b/pkg/socketactivation/activation.go @@ -0,0 +1,61 @@ +/* + Package to allow go applications to immediately start + listening on a socket, unix, tcp, udp but hold connections + until the application has booted and is ready to accept them +*/ +package socketactivation + +import ( + "fmt" + "net" + "time" +) + +// NewActivationListener returns a listener listening on addr with the protocol. It sets the +// timeout to wait on first connection before an error is returned +func NewActivationListener(proto, addr string, activate chan struct{}, timeout time.Duration) (net.Listener, error) { + wrapped, err := net.Listen(proto, addr) + if err != nil { + return nil, err + } + + return &defaultListener{ + wrapped: wrapped, + activate: activate, + timeout: timeout, + }, nil +} + +type defaultListener struct { + wrapped net.Listener // the real listener to wrap + ready bool // is the listner ready to start accpeting connections + activate chan struct{} + timeout time.Duration // how long to wait before we consider this an error +} + +func (l *defaultListener) Close() error { + return l.wrapped.Close() +} + +func (l *defaultListener) Addr() net.Addr { + return l.wrapped.Addr() +} + +func (l *defaultListener) Accept() (net.Conn, error) { + // if the listen has been told it is ready then we can go ahead and + // start returning connections + if l.ready { + return l.wrapped.Accept() + } + + select { + case <-time.After(l.timeout): + // close the connection so any clients are disconnected + l.Close() + return nil, fmt.Errorf("timeout (%s) reached waiting for listener to become ready", l.timeout.String()) + case <-l.activate: + l.ready = true + return l.Accept() + } + panic("unreachable") +} From 64716a21e464ac63dd9b287c832ae3431e1418d0 Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Sat, 15 Feb 2014 20:24:55 -0800 Subject: [PATCH 2/5] Use socket activation pkg for listeners Docker-DCO-1.1-Signed-off-by: Michael Crosby (github: crosbymichael) --- api/api.go | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/api/api.go b/api/api.go index 2cfc75631e..ed8c8bb795 100644 --- a/api/api.go +++ b/api/api.go @@ -10,6 +10,7 @@ import ( "fmt" "github.com/dotcloud/docker/auth" "github.com/dotcloud/docker/engine" + "github.com/dotcloud/docker/pkg/socketactivation" "github.com/dotcloud/docker/pkg/systemd" "github.com/dotcloud/docker/utils" "github.com/gorilla/mux" @@ -25,6 +26,7 @@ import ( "strconv" "strings" "syscall" + "time" ) // FIXME: move code common to client and server to common.go @@ -34,6 +36,10 @@ const ( DEFAULTUNIXSOCKET = "/var/run/docker.sock" ) +var ( + activationLock chan struct{} +) + func ValidateHost(val string) (string, error) { host, err := utils.ParseHost(DEFAULTHTTPHOST, DEFAULTUNIXSOCKET, val) if err != nil { @@ -46,6 +52,7 @@ type HttpApiFunc func(eng *engine.Engine, version float64, w http.ResponseWriter func init() { engine.Register("serveapi", ServeApi) + engine.Register("acceptconnections", AcceptConnections) } func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) { @@ -1156,7 +1163,7 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors } } - l, err := net.Listen(proto, addr) + l, err := socketactivation.NewActivationListener(proto, addr, activationLock, 15*time.Minute) if err != nil { return err } @@ -1198,8 +1205,11 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors // ServeApi loops through all of the protocols sent in to docker and spawns // off a go routine to setup a serving http.Server for each. func ServeApi(job *engine.Job) engine.Status { - protoAddrs := job.Args - chErrors := make(chan error, len(protoAddrs)) + var ( + protoAddrs = job.Args + chErrors = make(chan error, len(protoAddrs)) + ) + activationLock = make(chan struct{}) for _, protoAddr := range protoAddrs { protoAddrParts := strings.SplitN(protoAddr, "://", 2) @@ -1209,6 +1219,8 @@ func ServeApi(job *engine.Job) engine.Status { }() } + AcceptConnections(nil) + for i := 0; i < len(protoAddrs); i += 1 { err := <-chErrors if err != nil { @@ -1216,8 +1228,15 @@ func ServeApi(job *engine.Job) engine.Status { } } + return engine.StatusOK +} + +func AcceptConnections(job *engine.Job) engine.Status { // Tell the init daemon we are accepting requests go systemd.SdNotify("READY=1") + // close the lock so the listeners start accepting connections + close(activationLock) + return engine.StatusOK } From 778f1bf6639e52004608ca2289e04b8f7bddb14b Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Sat, 15 Feb 2014 20:49:50 -0800 Subject: [PATCH 3/5] Integration generic socket wait for docker api Docker-DCO-1.1-Signed-off-by: Michael Crosby (github: crosbymichael) --- api/api.go | 2 -- docker/docker.go | 47 +++++++++++++++++++++++-------------- integration/runtime_test.go | 5 ++++ 3 files changed, 34 insertions(+), 20 deletions(-) diff --git a/api/api.go b/api/api.go index ed8c8bb795..8f6729ab61 100644 --- a/api/api.go +++ b/api/api.go @@ -1219,8 +1219,6 @@ func ServeApi(job *engine.Job) engine.Status { }() } - AcceptConnections(nil) - for i := 0; i < len(protoAddrs); i += 1 { err := <-chErrors if err != nil { diff --git a/docker/docker.go b/docker/docker.go index b4d7879397..fe214811cf 100644 --- a/docker/docker.go +++ b/docker/docker.go @@ -78,25 +78,36 @@ func main() { if err != nil { log.Fatal(err) } - // Load plugin: httpapi - job := eng.Job("initserver") - job.Setenv("Pidfile", *pidfile) - job.Setenv("Root", *flRoot) - job.SetenvBool("AutoRestart", *flAutoRestart) - job.SetenvList("Dns", flDns.GetAll()) - job.SetenvBool("EnableIptables", *flEnableIptables) - job.SetenvBool("EnableIpForward", *flEnableIpForward) - job.Setenv("BridgeIface", *bridgeName) - job.Setenv("BridgeIP", *bridgeIp) - job.Setenv("DefaultIp", *flDefaultIp) - job.SetenvBool("InterContainerCommunication", *flInterContainerComm) - job.Setenv("GraphDriver", *flGraphDriver) - job.SetenvInt("Mtu", *flMtu) - if err := job.Run(); err != nil { - log.Fatal(err) - } + // load the daemon in the background so we can immediately start + // the http api so that connections don't fail while the daemon + // is booting + go func() { + // Load plugin: httpapi + job := eng.Job("initserver") + job.Setenv("Pidfile", *pidfile) + job.Setenv("Root", *flRoot) + job.SetenvBool("AutoRestart", *flAutoRestart) + job.SetenvList("Dns", flDns.GetAll()) + job.SetenvBool("EnableIptables", *flEnableIptables) + job.SetenvBool("EnableIpForward", *flEnableIpForward) + job.Setenv("BridgeIface", *bridgeName) + job.Setenv("BridgeIP", *bridgeIp) + job.Setenv("DefaultIp", *flDefaultIp) + job.SetenvBool("InterContainerCommunication", *flInterContainerComm) + job.Setenv("GraphDriver", *flGraphDriver) + job.SetenvInt("Mtu", *flMtu) + if err := job.Run(); err != nil { + log.Fatal(err) + } + // after the daemon is done setting up we can tell the api to start + // accepting connections + if err := eng.Job("acceptconnections").Run(); err != nil { + log.Fatal(err) + } + }() + // Serve api - job = eng.Job("serveapi", flHosts.GetAll()...) + job := eng.Job("serveapi", flHosts.GetAll()...) job.SetenvBool("Logging", true) job.SetenvBool("EnableCors", *flEnableCors) job.Setenv("Version", dockerversion.VERSION) diff --git a/integration/runtime_test.go b/integration/runtime_test.go index 170f4c9638..ca2119ce1f 100644 --- a/integration/runtime_test.go +++ b/integration/runtime_test.go @@ -171,9 +171,14 @@ func spawnGlobalDaemon() { log.Fatalf("Unable to spawn the test daemon: %s", err) } }() + // Give some time to ListenAndServer to actually start // FIXME: use inmem transports instead of tcp time.Sleep(time.Second) + + if err := eng.Job("acceptconnections").Run(); err != nil { + log.Fatalf("Unable to accept connections for test api: %s", err) + } } // FIXME: test that ImagePull(json=true) send correct json output From d5e41c1cb67b7ccb86fbbd80079642afd5055c2a Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Sat, 15 Feb 2014 21:10:37 -0800 Subject: [PATCH 4/5] Change name to listenbuffer Docker-DCO-1.1-Signed-off-by: Michael Crosby (github: crosbymichael) --- api/api.go | 4 ++-- .../activation.go => listenbuffer/buffer.go} | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) rename pkg/{socketactivation/activation.go => listenbuffer/buffer.go} (85%) diff --git a/api/api.go b/api/api.go index 8f6729ab61..8d7b1de685 100644 --- a/api/api.go +++ b/api/api.go @@ -10,7 +10,7 @@ import ( "fmt" "github.com/dotcloud/docker/auth" "github.com/dotcloud/docker/engine" - "github.com/dotcloud/docker/pkg/socketactivation" + "github.com/dotcloud/docker/pkg/listenbuffer" "github.com/dotcloud/docker/pkg/systemd" "github.com/dotcloud/docker/utils" "github.com/gorilla/mux" @@ -1163,7 +1163,7 @@ func ListenAndServe(proto, addr string, eng *engine.Engine, logging, enableCors } } - l, err := socketactivation.NewActivationListener(proto, addr, activationLock, 15*time.Minute) + l, err := listenbuffer.NewListenBuffer(proto, addr, activationLock, 15*time.Minute) if err != nil { return err } diff --git a/pkg/socketactivation/activation.go b/pkg/listenbuffer/buffer.go similarity index 85% rename from pkg/socketactivation/activation.go rename to pkg/listenbuffer/buffer.go index 0edbcaac23..c350805a7d 100644 --- a/pkg/socketactivation/activation.go +++ b/pkg/listenbuffer/buffer.go @@ -3,7 +3,7 @@ listening on a socket, unix, tcp, udp but hold connections until the application has booted and is ready to accept them */ -package socketactivation +package listenbuffer import ( "fmt" @@ -11,9 +11,9 @@ import ( "time" ) -// NewActivationListener returns a listener listening on addr with the protocol. It sets the +// NewListenBuffer returns a listener listening on addr with the protocol. It sets the // timeout to wait on first connection before an error is returned -func NewActivationListener(proto, addr string, activate chan struct{}, timeout time.Duration) (net.Listener, error) { +func NewListenBuffer(proto, addr string, activate chan struct{}, timeout time.Duration) (net.Listener, error) { wrapped, err := net.Listen(proto, addr) if err != nil { return nil, err From e743021193e589b1358d0eef0521e9d8878dd66d Mon Sep 17 00:00:00 2001 From: Michael Crosby Date: Mon, 17 Feb 2014 15:10:51 -0800 Subject: [PATCH 5/5] Move job register into servapi Docker-DCO-1.1-Signed-off-by: Michael Crosby (github: crosbymichael) --- api/api.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/api/api.go b/api/api.go index 8d7b1de685..bd4283e301 100644 --- a/api/api.go +++ b/api/api.go @@ -52,7 +52,6 @@ type HttpApiFunc func(eng *engine.Engine, version float64, w http.ResponseWriter func init() { engine.Register("serveapi", ServeApi) - engine.Register("acceptconnections", AcceptConnections) } func hijackServer(w http.ResponseWriter) (io.ReadCloser, io.Writer, error) { @@ -1211,6 +1210,10 @@ func ServeApi(job *engine.Job) engine.Status { ) activationLock = make(chan struct{}) + if err := job.Eng.Register("acceptconnections", AcceptConnections); err != nil { + return job.Error(err) + } + for _, protoAddr := range protoAddrs { protoAddrParts := strings.SplitN(protoAddr, "://", 2) go func() {