mirror of https://github.com/containers/podman.git
				
				
				
			Merge pull request #1709 from baude/parallelheur
Add --max-workers and heuristics for parallel operations
This commit is contained in:
		
						commit
						f6e7807fa5
					
				| 
						 | 
				
			
			@ -211,6 +211,11 @@ func main() {
 | 
			
		|||
			Value:  hooks.DefaultDir,
 | 
			
		||||
			Hidden: true,
 | 
			
		||||
		},
 | 
			
		||||
		cli.IntFlag{
 | 
			
		||||
			Name:   "max-workers",
 | 
			
		||||
			Usage:  "the maximum number of workers for parallel operations",
 | 
			
		||||
			Hidden: true,
 | 
			
		||||
		},
 | 
			
		||||
		cli.StringFlag{
 | 
			
		||||
			Name:  "log-level",
 | 
			
		||||
			Usage: "log messages above specified level: debug, info, warn, error (default), fatal or panic",
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -20,6 +20,7 @@ import (
 | 
			
		|||
	"github.com/cri-o/ocicni/pkg/ocicni"
 | 
			
		||||
	"github.com/docker/go-units"
 | 
			
		||||
	"github.com/pkg/errors"
 | 
			
		||||
	"github.com/sirupsen/logrus"
 | 
			
		||||
	"github.com/urfave/cli"
 | 
			
		||||
	"k8s.io/apimachinery/pkg/fields"
 | 
			
		||||
)
 | 
			
		||||
| 
						 | 
				
			
			@ -300,7 +301,13 @@ func psCmd(c *cli.Context) error {
 | 
			
		|||
		outputContainers = []*libpod.Container{latestCtr}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	pss := shared.PBatch(outputContainers, 8, opts)
 | 
			
		||||
	maxWorkers := shared.Parallelize("ps")
 | 
			
		||||
	if c.GlobalIsSet("max-workers") {
 | 
			
		||||
		maxWorkers = c.GlobalInt("max-workers")
 | 
			
		||||
	}
 | 
			
		||||
	logrus.Debugf("Setting maximum workers to %d", maxWorkers)
 | 
			
		||||
 | 
			
		||||
	pss := shared.PBatch(outputContainers, maxWorkers, opts)
 | 
			
		||||
	if opts.Sort != "" {
 | 
			
		||||
		pss, err = sortPsOutput(opts.Sort, pss)
 | 
			
		||||
		if err != nil {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -2,11 +2,11 @@ package main
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	rt "runtime"
 | 
			
		||||
 | 
			
		||||
	"github.com/containers/libpod/cmd/podman/libpodruntime"
 | 
			
		||||
	"github.com/containers/libpod/cmd/podman/shared"
 | 
			
		||||
	"github.com/containers/libpod/libpod"
 | 
			
		||||
	"github.com/pkg/errors"
 | 
			
		||||
	"github.com/sirupsen/logrus"
 | 
			
		||||
	"github.com/urfave/cli"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -48,14 +48,13 @@ func rmCmd(c *cli.Context) error {
 | 
			
		|||
	var (
 | 
			
		||||
		delContainers []*libpod.Container
 | 
			
		||||
		lastError     error
 | 
			
		||||
		deleteFuncs   []workerInput
 | 
			
		||||
		deleteFuncs   []shared.ParallelWorkerInput
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	ctx := getContext()
 | 
			
		||||
	if err := validateFlags(c, rmFlags); err != nil {
 | 
			
		||||
		return err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	runtime, err := libpodruntime.GetRuntime(c)
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return errors.Wrapf(err, "could not get runtime")
 | 
			
		||||
| 
						 | 
				
			
			@ -69,17 +68,23 @@ func rmCmd(c *cli.Context) error {
 | 
			
		|||
	delContainers, lastError = getAllOrLatestContainers(c, runtime, -1, "all")
 | 
			
		||||
 | 
			
		||||
	for _, container := range delContainers {
 | 
			
		||||
		con := container
 | 
			
		||||
		f := func() error {
 | 
			
		||||
			return runtime.RemoveContainer(ctx, container, c.Bool("force"))
 | 
			
		||||
			return runtime.RemoveContainer(ctx, con, c.Bool("force"))
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		deleteFuncs = append(deleteFuncs, workerInput{
 | 
			
		||||
			containerID:  container.ID(),
 | 
			
		||||
			parallelFunc: f,
 | 
			
		||||
		deleteFuncs = append(deleteFuncs, shared.ParallelWorkerInput{
 | 
			
		||||
			ContainerID:  con.ID(),
 | 
			
		||||
			ParallelFunc: f,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
	maxWorkers := shared.Parallelize("rm")
 | 
			
		||||
	if c.GlobalIsSet("max-workers") {
 | 
			
		||||
		maxWorkers = c.GlobalInt("max-workers")
 | 
			
		||||
	}
 | 
			
		||||
	logrus.Debugf("Setting maximum workers to %d", maxWorkers)
 | 
			
		||||
 | 
			
		||||
	deleteErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, deleteFuncs)
 | 
			
		||||
	deleteErrors := shared.ParallelExecuteWorkerPool(maxWorkers, deleteFuncs)
 | 
			
		||||
	for cid, result := range deleteErrors {
 | 
			
		||||
		if result != nil {
 | 
			
		||||
			fmt.Println(result.Error())
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -226,10 +226,10 @@ func NewBatchContainer(ctr *libpod.Container, opts PsOptions) (PsContainerOutput
 | 
			
		|||
	return pso, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type pFunc func() (PsContainerOutput, error)
 | 
			
		||||
type batchFunc func() (PsContainerOutput, error)
 | 
			
		||||
 | 
			
		||||
type workerInput struct {
 | 
			
		||||
	parallelFunc pFunc
 | 
			
		||||
	parallelFunc batchFunc
 | 
			
		||||
	opts         PsOptions
 | 
			
		||||
	cid          string
 | 
			
		||||
	job          int
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -0,0 +1,91 @@
 | 
			
		|||
package shared
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"runtime"
 | 
			
		||||
	"sync"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type pFunc func() error
 | 
			
		||||
 | 
			
		||||
// ParallelWorkerInput is a struct used to pass in a slice of parallel funcs to be
 | 
			
		||||
// performed on a container ID
 | 
			
		||||
type ParallelWorkerInput struct {
 | 
			
		||||
	ContainerID  string
 | 
			
		||||
	ParallelFunc pFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type containerError struct {
 | 
			
		||||
	ContainerID string
 | 
			
		||||
	Err         error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ParallelWorker is a "threaded" worker that takes jobs from the channel "queue"
 | 
			
		||||
func ParallelWorker(wg *sync.WaitGroup, jobs <-chan ParallelWorkerInput, results chan<- containerError) {
 | 
			
		||||
	for j := range jobs {
 | 
			
		||||
		err := j.ParallelFunc()
 | 
			
		||||
		results <- containerError{ContainerID: j.ContainerID, Err: err}
 | 
			
		||||
		wg.Done()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ParallelExecuteWorkerPool takes container jobs and performs them in parallel.  The worker
 | 
			
		||||
// int determines how many workers/threads should be premade.
 | 
			
		||||
func ParallelExecuteWorkerPool(workers int, functions []ParallelWorkerInput) map[string]error {
 | 
			
		||||
	var (
 | 
			
		||||
		wg sync.WaitGroup
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	resultChan := make(chan containerError, len(functions))
 | 
			
		||||
	results := make(map[string]error)
 | 
			
		||||
	paraJobs := make(chan ParallelWorkerInput, len(functions))
 | 
			
		||||
 | 
			
		||||
	// If we have more workers than functions, match up the number of workers and functions
 | 
			
		||||
	if workers > len(functions) {
 | 
			
		||||
		workers = len(functions)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create the workers
 | 
			
		||||
	for w := 1; w <= workers; w++ {
 | 
			
		||||
		go ParallelWorker(&wg, paraJobs, resultChan)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add jobs to the workers
 | 
			
		||||
	for _, j := range functions {
 | 
			
		||||
		j := j
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		paraJobs <- j
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	close(paraJobs)
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	close(resultChan)
 | 
			
		||||
	for ctrError := range resultChan {
 | 
			
		||||
		results[ctrError.ContainerID] = ctrError.Err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return results
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Parallelize provides the maximum number of parallel workers (int) as calculated by a basic
 | 
			
		||||
// heuristic. This can be overriden by the --max-workers primary switch to podman.
 | 
			
		||||
func Parallelize(job string) int {
 | 
			
		||||
	numCpus := runtime.NumCPU()
 | 
			
		||||
	switch job {
 | 
			
		||||
	case "stop":
 | 
			
		||||
		if numCpus <= 2 {
 | 
			
		||||
			return 4
 | 
			
		||||
		} else {
 | 
			
		||||
			return numCpus * 3
 | 
			
		||||
		}
 | 
			
		||||
	case "rm":
 | 
			
		||||
		if numCpus <= 3 {
 | 
			
		||||
			return numCpus * 3
 | 
			
		||||
		} else {
 | 
			
		||||
			return numCpus * 4
 | 
			
		||||
		}
 | 
			
		||||
	case "ps":
 | 
			
		||||
		return 8
 | 
			
		||||
	}
 | 
			
		||||
	return 3
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			@ -2,12 +2,12 @@ package main
 | 
			
		|||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	rt "runtime"
 | 
			
		||||
 | 
			
		||||
	"github.com/containers/libpod/cmd/podman/libpodruntime"
 | 
			
		||||
	"github.com/containers/libpod/cmd/podman/shared"
 | 
			
		||||
	"github.com/containers/libpod/libpod"
 | 
			
		||||
	"github.com/containers/libpod/pkg/rootless"
 | 
			
		||||
	"github.com/pkg/errors"
 | 
			
		||||
	"github.com/sirupsen/logrus"
 | 
			
		||||
	"github.com/urfave/cli"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -61,7 +61,7 @@ func stopCmd(c *cli.Context) error {
 | 
			
		|||
 | 
			
		||||
	containers, lastError := getAllOrLatestContainers(c, runtime, libpod.ContainerStateRunning, "running")
 | 
			
		||||
 | 
			
		||||
	var stopFuncs []workerInput
 | 
			
		||||
	var stopFuncs []shared.ParallelWorkerInput
 | 
			
		||||
	for _, ctr := range containers {
 | 
			
		||||
		con := ctr
 | 
			
		||||
		var stopTimeout uint
 | 
			
		||||
| 
						 | 
				
			
			@ -73,13 +73,19 @@ func stopCmd(c *cli.Context) error {
 | 
			
		|||
		f := func() error {
 | 
			
		||||
			return con.StopWithTimeout(stopTimeout)
 | 
			
		||||
		}
 | 
			
		||||
		stopFuncs = append(stopFuncs, workerInput{
 | 
			
		||||
			containerID:  con.ID(),
 | 
			
		||||
			parallelFunc: f,
 | 
			
		||||
		stopFuncs = append(stopFuncs, shared.ParallelWorkerInput{
 | 
			
		||||
			ContainerID:  con.ID(),
 | 
			
		||||
			ParallelFunc: f,
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	stopErrors := parallelExecuteWorkerPool(rt.NumCPU()*3, stopFuncs)
 | 
			
		||||
	maxWorkers := shared.Parallelize("stop")
 | 
			
		||||
	if c.GlobalIsSet("max-workers") {
 | 
			
		||||
		maxWorkers = c.GlobalInt("max-workers")
 | 
			
		||||
	}
 | 
			
		||||
	logrus.Debugf("Setting maximum workers to %d", maxWorkers)
 | 
			
		||||
 | 
			
		||||
	stopErrors := shared.ParallelExecuteWorkerPool(maxWorkers, stopFuncs)
 | 
			
		||||
 | 
			
		||||
	for cid, result := range stopErrors {
 | 
			
		||||
		if result != nil && result != libpod.ErrCtrStopped {
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -3,10 +3,6 @@ package main
 | 
			
		|||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"os"
 | 
			
		||||
	gosignal "os/signal"
 | 
			
		||||
	"sync"
 | 
			
		||||
 | 
			
		||||
	"github.com/containers/libpod/libpod"
 | 
			
		||||
	"github.com/docker/docker/pkg/signal"
 | 
			
		||||
	"github.com/docker/docker/pkg/term"
 | 
			
		||||
| 
						 | 
				
			
			@ -15,6 +11,8 @@ import (
 | 
			
		|||
	"github.com/urfave/cli"
 | 
			
		||||
	"golang.org/x/crypto/ssh/terminal"
 | 
			
		||||
	"k8s.io/client-go/tools/remotecommand"
 | 
			
		||||
	"os"
 | 
			
		||||
	gosignal "os/signal"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
type RawTtyFormatter struct {
 | 
			
		||||
| 
						 | 
				
			
			@ -209,63 +207,3 @@ func getPodsFromContext(c *cli.Context, r *libpod.Runtime) ([]*libpod.Pod, error
 | 
			
		|||
	}
 | 
			
		||||
	return pods, lastError
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type pFunc func() error
 | 
			
		||||
 | 
			
		||||
type workerInput struct {
 | 
			
		||||
	containerID  string
 | 
			
		||||
	parallelFunc pFunc
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type containerError struct {
 | 
			
		||||
	containerID string
 | 
			
		||||
	err         error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// worker is a "threaded" worker that takes jobs from the channel "queue"
 | 
			
		||||
func worker(wg *sync.WaitGroup, jobs <-chan workerInput, results chan<- containerError) {
 | 
			
		||||
	for j := range jobs {
 | 
			
		||||
		err := j.parallelFunc()
 | 
			
		||||
		results <- containerError{containerID: j.containerID, err: err}
 | 
			
		||||
		wg.Done()
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// parallelExecuteWorkerPool takes container jobs and performs them in parallel.  The worker
 | 
			
		||||
// int is determines how many workers/threads should be premade.
 | 
			
		||||
func parallelExecuteWorkerPool(workers int, functions []workerInput) map[string]error {
 | 
			
		||||
	var (
 | 
			
		||||
		wg sync.WaitGroup
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	resultChan := make(chan containerError, len(functions))
 | 
			
		||||
	results := make(map[string]error)
 | 
			
		||||
	paraJobs := make(chan workerInput, len(functions))
 | 
			
		||||
 | 
			
		||||
	// If we have more workers than functions, match up the number of workers and functions
 | 
			
		||||
	if workers > len(functions) {
 | 
			
		||||
		workers = len(functions)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Create the workers
 | 
			
		||||
	for w := 1; w <= workers; w++ {
 | 
			
		||||
		go worker(&wg, paraJobs, resultChan)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// Add jobs to the workers
 | 
			
		||||
	for _, j := range functions {
 | 
			
		||||
		j := j
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		paraJobs <- j
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	close(paraJobs)
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
 | 
			
		||||
	close(resultChan)
 | 
			
		||||
	for ctrError := range resultChan {
 | 
			
		||||
		results[ctrError.containerID] = ctrError.err
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return results
 | 
			
		||||
}
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue