Stop containers in parallel fashion

Prior, we were stopping containers serially.  So if a container had a default
timeout of 10 seconds and there were five containers being stopped, the operation
would take roughly 50 seconds.  If we stop these containers in parallel, the operation
should be roughly 10 seconds and change which is a significant speed up at scale.

Signed-off-by: baude <bbaude@redhat.com>
This commit is contained in:
baude 2018-10-09 13:02:45 -05:00
parent 23c9816ba9
commit 9be18c2eaf
3 changed files with 69 additions and 7 deletions

View File

@ -3,6 +3,7 @@ package main
import ( import (
"fmt" "fmt"
"os" "os"
rt "runtime"
"github.com/containers/libpod/cmd/podman/libpodruntime" "github.com/containers/libpod/cmd/podman/libpodruntime"
"github.com/containers/libpod/libpod" "github.com/containers/libpod/libpod"
@ -98,21 +99,33 @@ func stopCmd(c *cli.Context) error {
} }
} }
var stopFuncs []workerInput
for _, ctr := range containers { for _, ctr := range containers {
con := ctr
var stopTimeout uint var stopTimeout uint
if c.IsSet("timeout") { if c.IsSet("timeout") {
stopTimeout = c.Uint("timeout") stopTimeout = c.Uint("timeout")
} else { } else {
stopTimeout = ctr.StopTimeout() stopTimeout = ctr.StopTimeout()
} }
if err := ctr.StopWithTimeout(stopTimeout); err != nil && err != libpod.ErrCtrStopped { f := func() error {
if lastError != nil { return con.StopWithTimeout(stopTimeout)
fmt.Fprintln(os.Stderr, lastError)
}
lastError = errors.Wrapf(err, "failed to stop container %v", ctr.ID())
} else {
fmt.Println(ctr.ID())
} }
stopFuncs = append(stopFuncs, workerInput{
containerID: con.ID(),
parallelFunc: f,
})
}
stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs)
for cid, result := range stopErrors {
if result != nil && result != libpod.ErrCtrStopped {
fmt.Println(result.Error())
lastError = result
continue
}
fmt.Println(cid)
} }
return lastError return lastError
} }

View File

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"os" "os"
gosignal "os/signal" gosignal "os/signal"
"sync"
"github.com/containers/libpod/libpod" "github.com/containers/libpod/libpod"
"github.com/docker/docker/pkg/signal" "github.com/docker/docker/pkg/signal"
@ -215,3 +216,50 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error
} }
return pods, lastError return pods, lastError
} }
type pFunc func() error
type workerInput struct {
containerID string
parallelFunc pFunc
}
// worker is a "threaded" worker that takes jobs from the channel "queue"
func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results map[string]error) {
for j := range jobs {
err := j.parallelFunc()
results[j.containerID] = err
wg.Done()
}
}
// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
// int is determines how many workers/threads should be premade.
func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error {
var (
wg sync.WaitGroup
)
results := make(map[string]error)
paraJobs := make(chan workerInput, len(functions))
// If we have more workers than functions, match up the number of workers and functions
if workers > len(functions) {
workers = len(functions)
}
// Create the workers
for w := 1; w <= workers; w++ {
go worker(&wg, paraJobs, results)
}
// Add jobs to the workers
for _, j := range functions {
j := j
wg.Add(1)
paraJobs <- j
}
close(paraJobs)
wg.Wait()
return results
}

View File

@ -75,6 +75,7 @@ golang.org/x/net c427ad74c6d7a814201695e9ffde0c5d400a7674
golang.org/x/sys master golang.org/x/sys master
golang.org/x/text f72d8390a633d5dfb0cc84043294db9f6c935756 golang.org/x/text f72d8390a633d5dfb0cc84043294db9f6c935756
golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631 golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631
golang.org/x/sync master
google.golang.org/grpc v1.0.4 https://github.com/grpc/grpc-go google.golang.org/grpc v1.0.4 https://github.com/grpc/grpc-go
gopkg.in/cheggaaa/pb.v1 v1.0.7 gopkg.in/cheggaaa/pb.v1 v1.0.7
gopkg.in/inf.v0 v0.9.0 gopkg.in/inf.v0 v0.9.0