mirror of https://github.com/docker/docs.git
				
				
				
			Do not rely on "live" event anymore
Signed-off-by: Kenfe-Mickael Laventure <mickael.laventure@gmail.com> (cherry picked from commit 64483c3bdaa1887b8b932e0564362fbbff025dc0) Signed-off-by: Tibor Vass <tibor@docker.com>
This commit is contained in:
		
							parent
							
								
									6c717a5744
								
							
						
					
					
						commit
						b7687cc673
					
				|  | @ -176,7 +176,7 @@ func (daemon *Daemon) restore() error { | |||
| 			rm := c.RestartManager(false) | ||||
| 			if c.IsRunning() || c.IsPaused() { | ||||
| 				if err := daemon.containerd.Restore(c.ID, libcontainerd.WithRestartManager(rm)); err != nil { | ||||
| 					logrus.Errorf("Failed to restore with containerd: %q", err) | ||||
| 					logrus.Errorf("Failed to restore %s with containerd: %s", c.ID, err) | ||||
| 					return | ||||
| 				} | ||||
| 				if !c.HostConfig.NetworkMode.IsContainer() && c.IsRunning() { | ||||
|  |  | |||
|  | @ -7,7 +7,7 @@ import ( | |||
| 	"os/exec" | ||||
| 	"path/filepath" | ||||
| 	"strings" | ||||
| 	"time" | ||||
| 	"syscall" | ||||
| 
 | ||||
| 	"github.com/docker/docker/pkg/integration/checker" | ||||
| 	"github.com/go-check/check" | ||||
|  | @ -129,7 +129,11 @@ func (s *DockerDaemonSuite) TestDaemonShutdownWithPlugins(c *check.C) { | |||
| 		c.Fatalf("Could not kill daemon: %v", err) | ||||
| 	} | ||||
| 
 | ||||
| 	time.Sleep(5 * time.Second) | ||||
| 	for { | ||||
| 		if err := syscall.Kill(s.d.cmd.Process.Pid, 0); err == syscall.ESRCH { | ||||
| 			break | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	cmd := exec.Command("pgrep", "-f", "plugin-no-remove") | ||||
| 	if out, ec, err := runCommandWithOutput(cmd); ec != 1 { | ||||
|  |  | |||
|  | @ -281,16 +281,10 @@ func (clnt *client) cleanupOldRootfs(containerID string) { | |||
| 	} | ||||
| } | ||||
| 
 | ||||
| func (clnt *client) setExited(containerID string) error { | ||||
| func (clnt *client) setExited(containerID string, exitCode uint32) error { | ||||
| 	clnt.lock(containerID) | ||||
| 	defer clnt.unlock(containerID) | ||||
| 
 | ||||
| 	var exitCode uint32 | ||||
| 	if event, ok := clnt.remote.pastEvents[containerID]; ok { | ||||
| 		exitCode = event.Status | ||||
| 		delete(clnt.remote.pastEvents, containerID) | ||||
| 	} | ||||
| 
 | ||||
| 	err := clnt.backend.StateChanged(containerID, StateInfo{ | ||||
| 		CommonStateInfo: CommonStateInfo{ | ||||
| 			State:    StateExit, | ||||
|  | @ -393,7 +387,7 @@ func (clnt *client) getOrCreateExitNotifier(containerID string) *exitNotifier { | |||
| 	return w | ||||
| } | ||||
| 
 | ||||
| func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) (err error) { | ||||
| func (clnt *client) restore(cont *containerd.Container, lastEvent *containerd.Event, options ...CreateOption) (err error) { | ||||
| 	clnt.lock(cont.Id) | ||||
| 	defer clnt.unlock(cont.Id) | ||||
| 
 | ||||
|  | @ -441,66 +435,132 @@ func (clnt *client) restore(cont *containerd.Container, options ...CreateOption) | |||
| 		return err | ||||
| 	} | ||||
| 
 | ||||
| 	if event, ok := clnt.remote.pastEvents[containerID]; ok { | ||||
| 	if lastEvent != nil { | ||||
| 		// This should only be a pause or resume event
 | ||||
| 		if event.Type == StatePause || event.Type == StateResume { | ||||
| 		if lastEvent.Type == StatePause || lastEvent.Type == StateResume { | ||||
| 			return clnt.backend.StateChanged(containerID, StateInfo{ | ||||
| 				CommonStateInfo: CommonStateInfo{ | ||||
| 					State: event.Type, | ||||
| 					State: lastEvent.Type, | ||||
| 					Pid:   container.systemPid, | ||||
| 				}}) | ||||
| 		} | ||||
| 
 | ||||
| 		logrus.Warnf("unexpected backlog event: %#v", event) | ||||
| 		logrus.Warnf("unexpected backlog event: %#v", lastEvent) | ||||
| 	} | ||||
| 
 | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (clnt *client) Restore(containerID string, options ...CreateOption) error { | ||||
| 	if clnt.liveRestore { | ||||
| 		cont, err := clnt.getContainerdContainer(containerID) | ||||
| 		if err == nil && cont.Status != "stopped" { | ||||
| 			if err := clnt.restore(cont, options...); err != nil { | ||||
| 				logrus.Errorf("error restoring %s: %v", containerID, err) | ||||
| 			} | ||||
| 			return nil | ||||
| 		} | ||||
| 		return clnt.setExited(containerID) | ||||
| func (clnt *client) getContainerLastEvent(containerID string) (*containerd.Event, error) { | ||||
| 	er := &containerd.EventsRequest{ | ||||
| 		Timestamp:  clnt.remote.restoreFromTimestamp, | ||||
| 		StoredOnly: true, | ||||
| 		Id:         containerID, | ||||
| 	} | ||||
| 	events, err := clnt.remote.apiClient.Events(context.Background(), er) | ||||
| 	if err != nil { | ||||
| 		logrus.Errorf("libcontainerd: failed to get container events stream for %s: %q", er.Id, err) | ||||
| 		return nil, err | ||||
| 	} | ||||
| 
 | ||||
| 	var ev *containerd.Event | ||||
| 	for { | ||||
| 		e, err := events.Recv() | ||||
| 		if err != nil { | ||||
| 			if err.Error() == "EOF" { | ||||
| 				break | ||||
| 			} | ||||
| 			logrus.Errorf("libcontainerd: failed to get container event for %s: %q", containerID, err) | ||||
| 			return nil, err | ||||
| 		} | ||||
| 
 | ||||
| 		logrus.Debugf("libcontainerd: received past event %#v", e) | ||||
| 
 | ||||
| 		switch e.Type { | ||||
| 		case StateExit, StatePause, StateResume: | ||||
| 			ev = e | ||||
| 		} | ||||
| 	} | ||||
| 
 | ||||
| 	return ev, nil | ||||
| } | ||||
| 
 | ||||
| func (clnt *client) Restore(containerID string, options ...CreateOption) error { | ||||
| 	// Synchronize with live events
 | ||||
| 	clnt.remote.Lock() | ||||
| 	defer clnt.remote.Unlock() | ||||
| 	// Check that containerd still knows this container.
 | ||||
| 	//
 | ||||
| 	// In the unlikely event that Restore for this container process
 | ||||
| 	// the its past event before the main loop, the event will be
 | ||||
| 	// processed twice. However, this is not an issue as all those
 | ||||
| 	// events will do is change the state of the container to be
 | ||||
| 	// exactly the same.
 | ||||
| 	cont, err := clnt.getContainerdContainer(containerID) | ||||
| 	if err == nil && cont.Status != "stopped" { | ||||
| 		w := clnt.getOrCreateExitNotifier(containerID) | ||||
| 		clnt.lock(cont.Id) | ||||
| 		container := clnt.newContainer(cont.BundlePath) | ||||
| 		container.systemPid = systemPid(cont) | ||||
| 		clnt.appendContainer(container) | ||||
| 		clnt.unlock(cont.Id) | ||||
| 	// Get its last event
 | ||||
| 	ev, eerr := clnt.getContainerLastEvent(containerID) | ||||
| 	if err != nil || cont.Status == "Stopped" { | ||||
| 		if err != nil && !strings.Contains(err.Error(), "container not found") { | ||||
| 			// Legitimate error
 | ||||
| 			return err | ||||
| 		} | ||||
| 
 | ||||
| 		container.discardFifos() | ||||
| 		// If ev is nil, then we already consumed all the event of the
 | ||||
| 		// container, included the "exit" one.
 | ||||
| 		// Thus we return to avoid overriding the Exit Code.
 | ||||
| 		if ev == nil { | ||||
| 			logrus.Warnf("libcontainerd: restore was called on a fully synced container (%s)", containerID) | ||||
| 			return nil | ||||
| 		} | ||||
| 
 | ||||
| 		if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { | ||||
| 			logrus.Errorf("error sending sigterm to %v: %v", containerID, err) | ||||
| 		// get the exit status for this container
 | ||||
| 		ec := uint32(0) | ||||
| 		if eerr == nil && ev.Type == StateExit { | ||||
| 			ec = ev.Status | ||||
| 		} | ||||
| 		clnt.setExited(containerID, ec) | ||||
| 
 | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// container is still alive
 | ||||
| 	if clnt.liveRestore { | ||||
| 		if err := clnt.restore(cont, ev, options...); err != nil { | ||||
| 			logrus.Errorf("error restoring %s: %v", containerID, err) | ||||
| 		} | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	// Kill the container if liveRestore == false
 | ||||
| 	w := clnt.getOrCreateExitNotifier(containerID) | ||||
| 	clnt.lock(cont.Id) | ||||
| 	container := clnt.newContainer(cont.BundlePath) | ||||
| 	container.systemPid = systemPid(cont) | ||||
| 	clnt.appendContainer(container) | ||||
| 	clnt.unlock(cont.Id) | ||||
| 
 | ||||
| 	container.discardFifos() | ||||
| 
 | ||||
| 	if err := clnt.Signal(containerID, int(syscall.SIGTERM)); err != nil { | ||||
| 		logrus.Errorf("error sending sigterm to %v: %v", containerID, err) | ||||
| 	} | ||||
| 	select { | ||||
| 	case <-time.After(10 * time.Second): | ||||
| 		if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { | ||||
| 			logrus.Errorf("error sending sigkill to %v: %v", containerID, err) | ||||
| 		} | ||||
| 		select { | ||||
| 		case <-time.After(10 * time.Second): | ||||
| 			if err := clnt.Signal(containerID, int(syscall.SIGKILL)); err != nil { | ||||
| 				logrus.Errorf("error sending sigkill to %v: %v", containerID, err) | ||||
| 			} | ||||
| 			select { | ||||
| 			case <-time.After(2 * time.Second): | ||||
| 			case <-w.wait(): | ||||
| 				return nil | ||||
| 			} | ||||
| 		case <-time.After(2 * time.Second): | ||||
| 		case <-w.wait(): | ||||
| 			return nil | ||||
| 		} | ||||
| 	case <-w.wait(): | ||||
| 		return nil | ||||
| 	} | ||||
| 
 | ||||
| 	clnt.deleteContainer(containerID) | ||||
| 
 | ||||
| 	return clnt.setExited(containerID) | ||||
| 	return clnt.setExited(containerID, uint32(255)) | ||||
| } | ||||
| 
 | ||||
| type exitNotifier struct { | ||||
|  |  | |||
|  | @ -21,6 +21,7 @@ import ( | |||
| 	sysinfo "github.com/docker/docker/pkg/system" | ||||
| 	"github.com/docker/docker/utils" | ||||
| 	"github.com/golang/protobuf/ptypes" | ||||
| 	"github.com/golang/protobuf/ptypes/timestamp" | ||||
| 	"golang.org/x/net/context" | ||||
| 	"google.golang.org/grpc" | ||||
| 	"google.golang.org/grpc/grpclog" | ||||
|  | @ -40,22 +41,22 @@ const ( | |||
| 
 | ||||
| type remote struct { | ||||
| 	sync.RWMutex | ||||
| 	apiClient     containerd.APIClient | ||||
| 	daemonPid     int | ||||
| 	stateDir      string | ||||
| 	rpcAddr       string | ||||
| 	startDaemon   bool | ||||
| 	closeManually bool | ||||
| 	debugLog      bool | ||||
| 	rpcConn       *grpc.ClientConn | ||||
| 	clients       []*client | ||||
| 	eventTsPath   string | ||||
| 	pastEvents    map[string]*containerd.Event | ||||
| 	runtime       string | ||||
| 	runtimeArgs   []string | ||||
| 	daemonWaitCh  chan struct{} | ||||
| 	liveRestore   bool | ||||
| 	oomScore      int | ||||
| 	apiClient            containerd.APIClient | ||||
| 	daemonPid            int | ||||
| 	stateDir             string | ||||
| 	rpcAddr              string | ||||
| 	startDaemon          bool | ||||
| 	closeManually        bool | ||||
| 	debugLog             bool | ||||
| 	rpcConn              *grpc.ClientConn | ||||
| 	clients              []*client | ||||
| 	eventTsPath          string | ||||
| 	runtime              string | ||||
| 	runtimeArgs          []string | ||||
| 	daemonWaitCh         chan struct{} | ||||
| 	liveRestore          bool | ||||
| 	oomScore             int | ||||
| 	restoreFromTimestamp *timestamp.Timestamp | ||||
| } | ||||
| 
 | ||||
| // New creates a fresh instance of libcontainerd remote.
 | ||||
|  | @ -69,7 +70,6 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) { | |||
| 		stateDir:    stateDir, | ||||
| 		daemonPid:   -1, | ||||
| 		eventTsPath: filepath.Join(stateDir, eventTimestampFilename), | ||||
| 		pastEvents:  make(map[string]*containerd.Event), | ||||
| 	} | ||||
| 	for _, option := range options { | ||||
| 		if err := option.Apply(r); err != nil { | ||||
|  | @ -106,6 +106,14 @@ func New(stateDir string, options ...RemoteOption) (_ Remote, err error) { | |||
| 	r.rpcConn = conn | ||||
| 	r.apiClient = containerd.NewAPIClient(conn) | ||||
| 
 | ||||
| 	// Get the timestamp to restore from
 | ||||
| 	t := r.getLastEventTimestamp() | ||||
| 	tsp, err := ptypes.TimestampProto(t) | ||||
| 	if err != nil { | ||||
| 		logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err) | ||||
| 	} | ||||
| 	r.restoreFromTimestamp = tsp | ||||
| 
 | ||||
| 	go r.handleConnectionChange() | ||||
| 
 | ||||
| 	if err := r.startEventsMonitor(); err != nil { | ||||
|  | @ -257,7 +265,8 @@ func (r *remote) getLastEventTimestamp() time.Time { | |||
| 
 | ||||
| func (r *remote) startEventsMonitor() error { | ||||
| 	// First, get past events
 | ||||
| 	tsp, err := ptypes.TimestampProto(r.getLastEventTimestamp()) | ||||
| 	t := r.getLastEventTimestamp() | ||||
| 	tsp, err := ptypes.TimestampProto(t) | ||||
| 	if err != nil { | ||||
| 		logrus.Errorf("libcontainerd: failed to convert timestamp: %q", err) | ||||
| 	} | ||||
|  | @ -299,7 +308,7 @@ func (r *remote) handleEventStream(events containerd.API_EventsClient) { | |||
| 		} | ||||
| 		r.RUnlock() | ||||
| 		if container == nil { | ||||
| 			logrus.Errorf("libcontainerd: %q", err) | ||||
| 			logrus.Warnf("libcontainerd: unknown container %s", e.Id) | ||||
| 			continue | ||||
| 		} | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue