Merge pull request #25169 from mheon/graph_stop

Add graph-based pod stop
This commit is contained in:
openshift-merge-bot[bot] 2025-02-10 17:00:19 +00:00 committed by GitHub
commit a475083bff
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 386 additions and 175 deletions

View File

@ -85,34 +85,23 @@ func (c *Container) initUnlocked(ctx context.Context, recursive bool) error {
// Start requires that all dependency containers (e.g. pod infra containers) are
// running before starting the container. The recursive parameter, if set, will start all
// dependencies before starting this container.
func (c *Container) Start(ctx context.Context, recursive bool) (finalErr error) {
if !c.batched {
c.lock.Lock()
defer c.lock.Unlock()
// defer's are executed LIFO so we are locked here
// as long as we call this after the defer unlock()
defer func() {
if finalErr != nil {
if err := saveContainerError(c, finalErr); err != nil {
logrus.Debug(err)
}
}
}()
if err := c.syncContainer(); err != nil {
return err
func (c *Container) Start(ctx context.Context, recursive bool) error {
// Have to lock the pod the container is a part of.
// This prevents running `podman start` at the same time a
// `podman pod stop` is running, which could lead to weird races.
// Pod locks come before container locks, so do this first.
if c.config.Pod != "" {
// If we get an error, the pod was probably removed.
// So we get an expected ErrCtrRemoved instead of ErrPodRemoved,
// just ignore this and move on to syncing the container.
pod, _ := c.runtime.state.Pod(c.config.Pod)
if pod != nil {
pod.lock.Lock()
defer pod.lock.Unlock()
}
}
if err := c.prepareToStart(ctx, recursive); err != nil {
return err
}
// Start the container
if err := c.start(); err != nil {
return err
}
return c.waitForHealthy(ctx)
return c.startNoPodLock(ctx, recursive)
}
// Update updates the given container.
@ -294,6 +283,21 @@ func (c *Container) Stop() error {
// manually. If timeout is 0, SIGKILL will be used immediately to kill the
// container.
func (c *Container) StopWithTimeout(timeout uint) (finalErr error) {
// Have to lock the pod the container is a part of.
// This prevents running `podman stop` at the same time a
// `podman pod start` is running, which could lead to weird races.
// Pod locks come before container locks, so do this first.
if c.config.Pod != "" {
// If we get an error, the pod was probably removed.
// So we get an expected ErrCtrRemoved instead of ErrPodRemoved,
// just ignore this and move on to syncing the container.
pod, _ := c.runtime.state.Pod(c.config.Pod)
if pod != nil {
pod.lock.Lock()
defer pod.lock.Unlock()
}
}
if !c.batched {
c.lock.Lock()
defer c.lock.Unlock()
@ -852,58 +856,7 @@ func (c *Container) Cleanup(ctx context.Context, onlyStopped bool) error {
}
}
// Check if state is good
if !c.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateStopping, define.ContainerStateExited) {
return fmt.Errorf("container %s is running or paused, refusing to clean up: %w", c.ID(), define.ErrCtrStateInvalid)
}
if onlyStopped && !c.ensureState(define.ContainerStateStopped) {
return fmt.Errorf("container %s is not stopped and only cleanup for a stopped container was requested: %w", c.ID(), define.ErrCtrStateInvalid)
}
// if the container was not created in the oci runtime or was already cleaned up, then do nothing
if c.ensureState(define.ContainerStateConfigured, define.ContainerStateExited) {
return nil
}
// Handle restart policy.
// Returns a bool indicating whether we actually restarted.
// If we did, don't proceed to cleanup - just exit.
didRestart, err := c.handleRestartPolicy(ctx)
if err != nil {
return err
}
if didRestart {
return nil
}
// If we didn't restart, we perform a normal cleanup
// make sure all the container processes are terminated if we are running without a pid namespace.
hasPidNs := false
if c.config.Spec.Linux != nil {
for _, i := range c.config.Spec.Linux.Namespaces {
if i.Type == spec.PIDNamespace {
hasPidNs = true
break
}
}
}
if !hasPidNs {
// do not fail on errors
_ = c.ociRuntime.KillContainer(c, uint(unix.SIGKILL), true)
}
// Check for running exec sessions
sessions, err := c.getActiveExecSessions()
if err != nil {
return err
}
if len(sessions) > 0 {
return fmt.Errorf("container %s has active exec sessions, refusing to clean up: %w", c.ID(), define.ErrCtrStateInvalid)
}
defer c.newContainerEvent(events.Cleanup)
return c.cleanup(ctx)
return c.fullCleanup(ctx, onlyStopped)
}
// Batch starts a batch operation on the given container

View File

@ -4,14 +4,18 @@ package libpod
import (
"context"
"errors"
"fmt"
"strings"
"sync"
"github.com/containers/podman/v5/libpod/define"
"github.com/containers/podman/v5/pkg/parallel"
"github.com/sirupsen/logrus"
)
type containerNode struct {
lock sync.Mutex
id string
container *Container
dependsOn []*containerNode
@ -284,99 +288,241 @@ func startNode(ctx context.Context, node *containerNode, setError bool, ctrError
}
}
// Visit a node on the container graph and remove it, or set an error if it
// failed to remove. Only intended for use in pod removal; do *not* use when
// removing individual containers.
// All containers are assumed to be *UNLOCKED* on running this function.
// Container locks will be acquired as necessary.
// Pod and infraID are optional. If a pod is given it must be *LOCKED*.
func removeNode(ctx context.Context, node *containerNode, pod *Pod, force bool, timeout *uint, setError bool, ctrErrors map[string]error, ctrsVisited map[string]bool, ctrNamedVolumes map[string]*ContainerNamedVolume) {
// Contains all details required for traversing the container graph.
type nodeTraversal struct {
// Protects reads and writes to the two maps.
lock sync.Mutex
// Optional. but *MUST* be locked.
// Should NOT be changed once a traversal is started.
pod *Pod
// Function to execute on the individual container being acted on.
// Should NOT be changed once a traversal is started.
actionFunc func(ctr *Container, pod *Pod) error
// Shared list of errors for all containers currently acted on.
ctrErrors map[string]error
// Shared list of what containers have been visited.
ctrsVisited map[string]bool
}
// Perform a traversal of the graph in an inwards direction - meaning from nodes
// with no dependencies, recursing inwards to the nodes they depend on.
// Safe to run in parallel on multiple nodes.
func traverseNodeInwards(node *containerNode, nodeDetails *nodeTraversal, setError bool) {
node.lock.Lock()
// If we already visited this node, we're done.
if ctrsVisited[node.id] {
nodeDetails.lock.Lock()
visited := nodeDetails.ctrsVisited[node.id]
nodeDetails.lock.Unlock()
if visited {
node.lock.Unlock()
return
}
// Someone who depends on us failed.
// Mark us as failed and recurse.
if setError {
ctrsVisited[node.id] = true
ctrErrors[node.id] = fmt.Errorf("a container that depends on container %s could not be removed: %w", node.id, define.ErrCtrStateInvalid)
nodeDetails.lock.Lock()
nodeDetails.ctrsVisited[node.id] = true
nodeDetails.ctrErrors[node.id] = fmt.Errorf("a container that depends on container %s could not be stopped: %w", node.id, define.ErrCtrStateInvalid)
nodeDetails.lock.Unlock()
node.lock.Unlock()
// Hit anyone who depends on us, set errors there as well.
for _, successor := range node.dependsOn {
removeNode(ctx, successor, pod, force, timeout, true, ctrErrors, ctrsVisited, ctrNamedVolumes)
traverseNodeInwards(successor, nodeDetails, true)
}
return
}
// Does anyone still depend on us?
// Cannot remove if true. Once all our dependencies have been removed,
// we will be removed.
// Cannot stop if true. Once all our dependencies have been stopped,
// we will be stopped.
for _, dep := range node.dependedOn {
// The container that depends on us hasn't been removed yet.
// OK to continue on
if ok := ctrsVisited[dep.id]; !ok {
nodeDetails.lock.Lock()
ok := nodeDetails.ctrsVisited[dep.id]
nodeDetails.lock.Unlock()
if !ok {
node.lock.Unlock()
return
}
}
// Going to try to remove the node, mark us as visited
ctrsVisited[node.id] = true
ctrErrored := false
// Verify that all that depend on us are gone.
// Graph traversal should guarantee this is true, but this isn't that
// expensive, and it's better to be safe.
for _, dep := range node.dependedOn {
if _, err := node.container.runtime.GetContainer(dep.id); err == nil {
ctrErrored = true
ctrErrors[node.id] = fmt.Errorf("a container that depends on container %s still exists: %w", node.id, define.ErrDepExists)
}
if err := nodeDetails.actionFunc(node.container, nodeDetails.pod); err != nil {
ctrErrored = true
nodeDetails.lock.Lock()
nodeDetails.ctrErrors[node.id] = err
nodeDetails.lock.Unlock()
}
// Lock the container
node.container.lock.Lock()
// Gate all subsequent bits behind a ctrErrored check - we don't want to
// proceed if a previous step failed.
// Mark as visited *only after* finished with operation.
// This ensures that the operation has completed, one way or the other.
// If an error was set, only do this after the viral ctrErrored
// propagates in traverseNodeInwards below.
// Same with the node lock - we don't want to release it until we are
// marked as visited.
if !ctrErrored {
if err := node.container.syncContainer(); err != nil {
ctrErrored = true
ctrErrors[node.id] = err
}
nodeDetails.lock.Lock()
nodeDetails.ctrsVisited[node.id] = true
nodeDetails.lock.Unlock()
node.lock.Unlock()
}
if !ctrErrored {
for _, vol := range node.container.config.NamedVolumes {
// Recurse to anyone who we depend on and work on them
for _, successor := range node.dependsOn {
traverseNodeInwards(successor, nodeDetails, ctrErrored)
}
// If we propagated an error, finally mark us as visited here, after
// all nodes we traverse to have already been marked failed.
// If we don't do this, there is a race condition where a node could try
// and perform its operation before it was marked failed by the
// traverseNodeInwards triggered by this process.
if ctrErrored {
nodeDetails.lock.Lock()
nodeDetails.ctrsVisited[node.id] = true
nodeDetails.lock.Unlock()
node.lock.Unlock()
}
}
// Stop all containers in the given graph, assumed to be a graph of pod.
// Pod is mandatory and should be locked.
func stopContainerGraph(ctx context.Context, graph *ContainerGraph, pod *Pod, timeout *uint, cleanup bool) (map[string]error, error) {
// Are there actually any containers in the graph?
// If not, return immediately.
if len(graph.nodes) == 0 {
return map[string]error{}, nil
}
nodeDetails := new(nodeTraversal)
nodeDetails.pod = pod
nodeDetails.ctrErrors = make(map[string]error)
nodeDetails.ctrsVisited = make(map[string]bool)
traversalFunc := func(ctr *Container, pod *Pod) error {
ctr.lock.Lock()
defer ctr.lock.Unlock()
if err := ctr.syncContainer(); err != nil {
return err
}
realTimeout := ctr.config.StopTimeout
if timeout != nil {
realTimeout = *timeout
}
if err := ctr.stop(realTimeout); err != nil && !errors.Is(err, define.ErrCtrStateInvalid) && !errors.Is(err, define.ErrCtrStopped) {
return err
}
if cleanup {
return ctr.fullCleanup(ctx, false)
}
return nil
}
nodeDetails.actionFunc = traversalFunc
doneChans := make([]<-chan error, 0, len(graph.notDependedOnNodes))
// Parallel enqueue jobs for all our starting nodes.
if len(graph.notDependedOnNodes) == 0 {
return nil, fmt.Errorf("no containers in pod %s are not dependencies of other containers, unable to stop", pod.ID())
}
for _, node := range graph.notDependedOnNodes {
doneChan := parallel.Enqueue(ctx, func() error {
traverseNodeInwards(node, nodeDetails, false)
return nil
})
doneChans = append(doneChans, doneChan)
}
// We don't care about the returns values, these functions always return nil
// But we do need all of the parallel jobs to terminate.
for _, doneChan := range doneChans {
<-doneChan
}
return nodeDetails.ctrErrors, nil
}
// Remove all containers in the given graph
// Pod is optional, and must be locked if given.
func removeContainerGraph(ctx context.Context, graph *ContainerGraph, pod *Pod, timeout *uint, force bool) (map[string]*ContainerNamedVolume, map[string]bool, map[string]error, error) {
// Are there actually any containers in the graph?
// If not, return immediately.
if len(graph.nodes) == 0 {
return nil, nil, nil, nil
}
nodeDetails := new(nodeTraversal)
nodeDetails.pod = pod
nodeDetails.ctrErrors = make(map[string]error)
nodeDetails.ctrsVisited = make(map[string]bool)
ctrNamedVolumes := make(map[string]*ContainerNamedVolume)
traversalFunc := func(ctr *Container, pod *Pod) error {
ctr.lock.Lock()
defer ctr.lock.Unlock()
if err := ctr.syncContainer(); err != nil {
return err
}
for _, vol := range ctr.config.NamedVolumes {
ctrNamedVolumes[vol.Name] = vol
}
if pod != nil && pod.state.InfraContainerID == node.id {
if pod != nil && pod.state.InfraContainerID == ctr.ID() {
pod.state.InfraContainerID = ""
if err := pod.save(); err != nil {
ctrErrored = true
ctrErrors[node.id] = fmt.Errorf("error removing infra container %s from pod %s: %w", node.id, pod.ID(), err)
return fmt.Errorf("error removing infra container %s from pod %s: %w", ctr.ID(), pod.ID(), err)
}
}
}
if !ctrErrored {
opts := ctrRmOpts{
Force: force,
RemovePod: true,
Timeout: timeout,
}
if _, _, err := node.container.runtime.removeContainer(ctx, node.container, opts); err != nil {
ctrErrored = true
ctrErrors[node.id] = err
if _, _, err := ctr.runtime.removeContainer(ctx, ctr, opts); err != nil {
return err
}
return nil
}
nodeDetails.actionFunc = traversalFunc
doneChans := make([]<-chan error, 0, len(graph.notDependedOnNodes))
// Parallel enqueue jobs for all our starting nodes.
if len(graph.notDependedOnNodes) == 0 {
return nil, nil, nil, fmt.Errorf("no containers in graph are not dependencies of other containers, unable to stop")
}
for _, node := range graph.notDependedOnNodes {
doneChan := parallel.Enqueue(ctx, func() error {
traverseNodeInwards(node, nodeDetails, false)
return nil
})
doneChans = append(doneChans, doneChan)
}
node.container.lock.Unlock()
// Recurse to anyone who we depend on and remove them
for _, successor := range node.dependsOn {
removeNode(ctx, successor, pod, force, timeout, ctrErrored, ctrErrors, ctrsVisited, ctrNamedVolumes)
// We don't care about the returns values, these functions always return nil
// But we do need all of the parallel jobs to terminate.
for _, doneChan := range doneChans {
<-doneChan
}
return ctrNamedVolumes, nodeDetails.ctrsVisited, nodeDetails.ctrErrors, nil
}

View File

@ -1260,6 +1260,40 @@ func (c *Container) initAndStart(ctx context.Context) (retErr error) {
return c.waitForHealthy(ctx)
}
// Internal function to start a container without taking the pod lock.
// Please note that this DOES take the container lock.
// Intended to be used in pod-related functions.
func (c *Container) startNoPodLock(ctx context.Context, recursive bool) (finalErr error) {
if !c.batched {
c.lock.Lock()
defer c.lock.Unlock()
// defer's are executed LIFO so we are locked here
// as long as we call this after the defer unlock()
defer func() {
if finalErr != nil {
if err := saveContainerError(c, finalErr); err != nil {
logrus.Debug(err)
}
}
}()
if err := c.syncContainer(); err != nil {
return err
}
}
if err := c.prepareToStart(ctx, recursive); err != nil {
return err
}
// Start the container
if err := c.start(); err != nil {
return err
}
return c.waitForHealthy(ctx)
}
// Internal, non-locking function to start a container
func (c *Container) start() error {
if c.config.Spec.Process != nil {
@ -2080,6 +2114,62 @@ func (c *Container) cleanupStorage() error {
return cleanupErr
}
// fullCleanup performs all cleanup tasks, including handling restart policy.
func (c *Container) fullCleanup(ctx context.Context, onlyStopped bool) error {
// Check if state is good
if !c.ensureState(define.ContainerStateConfigured, define.ContainerStateCreated, define.ContainerStateStopped, define.ContainerStateStopping, define.ContainerStateExited) {
return fmt.Errorf("container %s is running or paused, refusing to clean up: %w", c.ID(), define.ErrCtrStateInvalid)
}
if onlyStopped && !c.ensureState(define.ContainerStateStopped) {
return fmt.Errorf("container %s is not stopped and only cleanup for a stopped container was requested: %w", c.ID(), define.ErrCtrStateInvalid)
}
// if the container was not created in the oci runtime or was already cleaned up, then do nothing
if c.ensureState(define.ContainerStateConfigured, define.ContainerStateExited) {
return nil
}
// Handle restart policy.
// Returns a bool indicating whether we actually restarted.
// If we did, don't proceed to cleanup - just exit.
didRestart, err := c.handleRestartPolicy(ctx)
if err != nil {
return err
}
if didRestart {
return nil
}
// If we didn't restart, we perform a normal cleanup
// make sure all the container processes are terminated if we are running without a pid namespace.
hasPidNs := false
if c.config.Spec.Linux != nil {
for _, i := range c.config.Spec.Linux.Namespaces {
if i.Type == spec.PIDNamespace {
hasPidNs = true
break
}
}
}
if !hasPidNs {
// do not fail on errors
_ = c.ociRuntime.KillContainer(c, uint(unix.SIGKILL), true)
}
// Check for running exec sessions
sessions, err := c.getActiveExecSessions()
if err != nil {
return err
}
if len(sessions) > 0 {
return fmt.Errorf("container %s has active exec sessions, refusing to clean up: %w", c.ID(), define.ErrCtrStateInvalid)
}
defer c.newContainerEvent(events.Cleanup)
return c.cleanup(ctx)
}
// Unmount the container and free its resources
func (c *Container) cleanup(ctx context.Context) error {
var lastError error

View File

@ -24,7 +24,7 @@ func (p *Pod) startInitContainers(ctx context.Context) error {
}
// Now iterate init containers
for _, initCon := range initCtrs {
if err := initCon.Start(ctx, true); err != nil {
if err := initCon.startNoPodLock(ctx, true); err != nil {
return err
}
// Check that the init container waited correctly and the exit
@ -156,50 +156,43 @@ func (p *Pod) stopWithTimeout(ctx context.Context, cleanup bool, timeout int) (m
return nil, err
}
// Stopping pods is not ordered by dependency. We haven't seen any case
// where this would actually matter.
ctrErrChan := make(map[string]<-chan error)
// Enqueue a function for each container with the parallel executor.
for _, ctr := range allCtrs {
c := ctr
logrus.Debugf("Adding parallel job to stop container %s", c.ID())
retChan := parallel.Enqueue(ctx, func() error {
// Can't batch these without forcing Stop() to hold the
// lock for the full duration of the timeout.
// We probably don't want to do that.
var err error
if timeout > -1 {
err = c.StopWithTimeout(uint(timeout))
} else {
err = c.Stop()
}
if err != nil && !errors.Is(err, define.ErrCtrStateInvalid) && !errors.Is(err, define.ErrCtrStopped) {
return err
}
if cleanup {
err := c.Cleanup(ctx, false)
if err != nil && !errors.Is(err, define.ErrCtrStateInvalid) && !errors.Is(err, define.ErrCtrStopped) {
return err
}
}
return nil
})
ctrErrChan[c.ID()] = retChan
}
p.newPodEvent(events.Stop)
ctrErrors := make(map[string]error)
var ctrErrors map[string]error
// Get returned error for every container we worked on
for id, channel := range ctrErrChan {
if err := <-channel; err != nil {
ctrErrors[id] = err
// Try and generate a graph of the pod for ordered stop.
graph, err := BuildContainerGraph(allCtrs)
if err != nil {
// Can't do an ordered stop, do it the old fashioned way.
logrus.Warnf("Unable to build graph for pod %s, switching to unordered stop: %v", p.ID(), err)
ctrErrors = make(map[string]error)
for _, ctr := range allCtrs {
var err error
if timeout > -1 {
err = ctr.StopWithTimeout(uint(timeout))
} else {
err = ctr.Stop()
}
if err != nil && !errors.Is(err, define.ErrCtrStateInvalid) && !errors.Is(err, define.ErrCtrStopped) {
ctrErrors[ctr.ID()] = err
} else if cleanup {
err := ctr.Cleanup(ctx, false)
if err != nil && !errors.Is(err, define.ErrCtrStateInvalid) && !errors.Is(err, define.ErrCtrStopped) {
ctrErrors[ctr.ID()] = err
}
}
}
} else {
var realTimeout *uint
if timeout > -1 {
innerTimeout := uint(timeout)
realTimeout = &innerTimeout
}
ctrErrors, err = stopContainerGraph(ctx, graph, p, realTimeout, cleanup)
if err != nil {
return nil, err
}
}

View File

@ -215,8 +215,10 @@ func (r *Runtime) removePod(ctx context.Context, p *Pod, removeCtrs, force bool,
return nil, fmt.Errorf("pod %s contains containers and cannot be removed: %w", p.ID(), define.ErrCtrExists)
}
var removalErr error
ctrNamedVolumes := make(map[string]*ContainerNamedVolume)
var (
removalErr error
ctrNamedVolumes map[string]*ContainerNamedVolume
)
// Build a graph of all containers in the pod.
graph, err := BuildContainerGraph(ctrs)
@ -235,11 +237,14 @@ func (r *Runtime) removePod(ctx context.Context, p *Pod, removeCtrs, force bool,
return removedCtrs, err
}
} else {
ctrErrors := make(map[string]error)
ctrsVisited := make(map[string]bool)
var (
ctrErrors map[string]error
ctrsVisited map[string]bool
)
for _, node := range graph.notDependedOnNodes {
removeNode(ctx, node, p, force, timeout, false, ctrErrors, ctrsVisited, ctrNamedVolumes)
ctrNamedVolumes, ctrsVisited, ctrErrors, err = removeContainerGraph(ctx, graph, p, timeout, force)
if err != nil {
return nil, err
}
// Finalize the removed containers list

View File

@ -4,6 +4,7 @@ package integration
import (
"path/filepath"
"time"
. "github.com/containers/podman/v5/test/utils"
. "github.com/onsi/ginkgo/v2"
@ -231,4 +232,27 @@ var _ = Describe("Podman pod stop", func() {
Expect(session).Should(ExitCleanly())
Expect(podmanTest.NumberOfContainersRunning()).To(Equal(0))
})
It("podman pod stop orders container stop", func() {
podName := "testpod"
infraName := "testpod-infra"
podmanTest.PodmanExitCleanly("pod", "create", "--infra-name", infraName, podName)
ctrName := "testctr"
podmanTest.PodmanExitCleanly("run", "-d", "--name", ctrName, "--pod", podName, ALPINE, "top")
podmanTest.PodmanExitCleanly("pod", "stop", podName)
ctrStop := podmanTest.PodmanExitCleanly("inspect", "--format", "{{ .State.FinishedAt }}", ctrName)
infraStop := podmanTest.PodmanExitCleanly("inspect", "--format", "{{ .State.FinishedAt }}", infraName)
timeFormat := "2006-01-02 15:04:05.999999999 -0700 MST"
ctrStopTime, err := time.Parse(timeFormat, ctrStop.OutputToString())
Expect(err).ShouldNot(HaveOccurred())
infraStopTime, err := time.Parse(timeFormat, infraStop.OutputToString())
Expect(err).ShouldNot(HaveOccurred())
Expect(infraStopTime).To(BeTemporally(">", ctrStopTime))
})
})