Merge pull request #1614 from baude/parastop

Stop containers in parallel fashion
This commit is contained in:
OpenShift Merge Robot 2018-10-11 10:55:18 -07:00 committed by GitHub
commit 83327e6973
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 7 deletions

View File

@ -3,6 +3,7 @@ package main
import (
"fmt"
"os"
rt "runtime"
"github.com/containers/libpod/cmd/podman/libpodruntime"
"github.com/containers/libpod/libpod"
@ -98,21 +99,33 @@ func stopCmd(c *cli.Context) error {
}
}
var stopFuncs []workerInput
for _, ctr := range containers {
con := ctr
var stopTimeout uint
if c.IsSet("timeout") {
stopTimeout = c.Uint("timeout")
} else {
stopTimeout = ctr.StopTimeout()
}
if err := ctr.StopWithTimeout(stopTimeout); err != nil && err != libpod.ErrCtrStopped {
if lastError != nil {
fmt.Fprintln(os.Stderr, lastError)
}
lastError = errors.Wrapf(err, "failed to stop container %v", ctr.ID())
} else {
fmt.Println(ctr.ID())
f := func() error {
return con.StopWithTimeout(stopTimeout)
}
stopFuncs = append(stopFuncs, workerInput{
containerID: con.ID(),
parallelFunc: f,
})
}
stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs)
for cid, result := range stopErrors {
if result != nil && result != libpod.ErrCtrStopped {
fmt.Println(result.Error())
lastError = result
continue
}
fmt.Println(cid)
}
return lastError
}

View File

@ -5,6 +5,7 @@ import (
"fmt"
"os"
gosignal "os/signal"
"sync"
"github.com/containers/libpod/libpod"
"github.com/docker/docker/pkg/signal"
@ -215,3 +216,50 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error
}
return pods, lastError
}
type pFunc func() error
type workerInput struct {
containerID string
parallelFunc pFunc
}
// worker is a "threaded" worker that takes jobs from the channel "queue"
func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results map[string]error) {
for j := range jobs {
err := j.parallelFunc()
results[j.containerID] = err
wg.Done()
}
}
// parallelExecuteWorkerPool takes container jobs and performs them in parallel. The worker
// int is determines how many workers/threads should be premade.
func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error {
var (
wg sync.WaitGroup
)
results := make(map[string]error)
paraJobs := make(chan workerInput, len(functions))
// If we have more workers than functions, match up the number of workers and functions
if workers > len(functions) {
workers = len(functions)
}
// Create the workers
for w := 1; w <= workers; w++ {
go worker(&wg, paraJobs, results)
}
// Add jobs to the workers
for _, j := range functions {
j := j
wg.Add(1)
paraJobs <- j
}
close(paraJobs)
wg.Wait()
return results
}

View File

@ -75,6 +75,7 @@ golang.org/x/net c427ad74c6d7a814201695e9ffde0c5d400a7674
golang.org/x/sys master
golang.org/x/text f72d8390a633d5dfb0cc84043294db9f6c935756
golang.org/x/time f51c12702a4d776e4c1fa9b0fabab841babae631
golang.org/x/sync master
google.golang.org/grpc v1.0.4 https://github.com/grpc/grpc-go
gopkg.in/cheggaaa/pb.v1 v1.0.7
gopkg.in/inf.v0 v0.9.0