podman/vendor/github.com/vbauerster/mpb/v8/progress.go

452 lines
10 KiB
Go

package mpb
import (
"bytes"
"context"
"fmt"
"io"
"math"
"os"
"sync"
"time"
"github.com/vbauerster/mpb/v8/cwriter"
"github.com/vbauerster/mpb/v8/decor"
)
const (
defaultRefreshRate = 150 * time.Millisecond
)
// DoneError represents an error when `*mpb.Progress` is done but its functionality is requested.
var DoneError = fmt.Errorf("%T instance can't be reused after it's done", (*Progress)(nil))
// Progress represents a container that renders one or more progress bars.
type Progress struct {
uwg *sync.WaitGroup
pwg, bwg sync.WaitGroup
operateState chan func(*pState)
interceptIO chan func(io.Writer)
done <-chan struct{}
cancel func()
}
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
type pState struct {
ctx context.Context
hm heapManager
dropS, dropD chan struct{}
renderReq chan time.Time
idCount int
popPriority int
// following are provided/overrided by user
refreshRate time.Duration
reqWidth int
popCompleted bool
autoRefresh bool
delayRC <-chan struct{}
manualRC <-chan interface{}
shutdownNotifier chan<- interface{}
queueBars map[*Bar]*Bar
output io.Writer
debugOut io.Writer
uwg *sync.WaitGroup
}
// New creates new Progress container instance. It's not possible to
// reuse instance after (*Progress).Wait method has been called.
func New(options ...ContainerOption) *Progress {
return NewWithContext(context.Background(), options...)
}
// NewWithContext creates new Progress container instance with provided
// context. It's not possible to reuse instance after (*Progress).Wait
// method has been called.
func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
if ctx == nil {
ctx = context.Background()
}
ctx, cancel := context.WithCancel(ctx)
s := &pState{
ctx: ctx,
hm: make(heapManager),
dropS: make(chan struct{}),
dropD: make(chan struct{}),
renderReq: make(chan time.Time),
refreshRate: defaultRefreshRate,
popPriority: math.MinInt32,
queueBars: make(map[*Bar]*Bar),
output: os.Stdout,
debugOut: io.Discard,
}
for _, opt := range options {
if opt != nil {
opt(s)
}
}
p := &Progress{
uwg: s.uwg,
operateState: make(chan func(*pState)),
interceptIO: make(chan func(io.Writer)),
cancel: cancel,
}
cw := cwriter.New(s.output)
if s.manualRC != nil {
done := make(chan struct{})
p.done = done
s.autoRefresh = false
go s.manualRefreshListener(done)
} else if cw.IsTerminal() || s.autoRefresh {
done := make(chan struct{})
p.done = done
s.autoRefresh = true
go s.autoRefreshListener(done)
} else {
p.done = ctx.Done()
s.autoRefresh = false
}
p.pwg.Add(1)
go p.serve(s, cw)
go s.hm.run()
return p
}
// AddBar creates a bar with default bar filler.
func (p *Progress) AddBar(total int64, options ...BarOption) *Bar {
return p.New(total, BarStyle(), options...)
}
// AddSpinner creates a bar with default spinner filler.
func (p *Progress) AddSpinner(total int64, options ...BarOption) *Bar {
return p.New(total, SpinnerStyle(), options...)
}
// New creates a bar by calling `Build` method on provided `BarFillerBuilder`.
func (p *Progress) New(total int64, builder BarFillerBuilder, options ...BarOption) *Bar {
return p.MustAdd(total, builder.Build(), options...)
}
// MustAdd creates a bar which renders itself by provided BarFiller.
// If `total <= 0` triggering complete event by increment methods is
// disabled. Panics if *Progress instance is done, i.e. called after
// (*Progress).Wait().
func (p *Progress) MustAdd(total int64, filler BarFiller, options ...BarOption) *Bar {
bar, err := p.Add(total, filler, options...)
if err != nil {
panic(err)
}
return bar
}
// Add creates a bar which renders itself by provided BarFiller.
// If `total <= 0` triggering complete event by increment methods
// is disabled. If *Progress instance is done, i.e. called after
// (*Progress).Wait(), return error == DoneError.
func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) (*Bar, error) {
if filler == nil {
filler = NopStyle().Build()
}
type result struct {
bar *Bar
bs *bState
}
ch := make(chan result)
select {
case p.operateState <- func(ps *pState) {
bs := ps.makeBarState(total, filler, options...)
bar := newBar(ps.ctx, p, bs)
if bs.waitBar != nil {
ps.queueBars[bs.waitBar] = bar
} else {
ps.hm.push(bar, true)
}
ps.idCount++
ch <- result{bar, bs}
}:
res := <-ch
bar, bs := res.bar, res.bs
bar.TraverseDecorators(func(d decor.Decorator) {
if d, ok := d.(decor.AverageDecorator); ok {
bs.averageDecorators = append(bs.averageDecorators, d)
}
if d, ok := d.(decor.EwmaDecorator); ok {
bs.ewmaDecorators = append(bs.ewmaDecorators, d)
}
if d, ok := d.(decor.ShutdownListener); ok {
bs.shutdownListeners = append(bs.shutdownListeners, d)
}
})
return bar, nil
case <-p.done:
return nil, DoneError
}
}
func (p *Progress) traverseBars(cb func(b *Bar) bool) {
iter, drop := make(chan *Bar), make(chan struct{})
select {
case p.operateState <- func(s *pState) { s.hm.iter(iter, drop) }:
for b := range iter {
if cb(b) {
close(drop)
break
}
}
case <-p.done:
}
}
// UpdateBarPriority same as *Bar.SetPriority(int).
func (p *Progress) UpdateBarPriority(b *Bar, priority int) {
select {
case p.operateState <- func(s *pState) { s.hm.fix(b, priority) }:
case <-p.done:
}
}
// Write is implementation of io.Writer.
// Writing to `*mpb.Progress` will print lines above a running bar.
// Writes aren't flushed immediately, but at next refresh cycle.
// If Write is called after `*mpb.Progress` is done, `mpb.DoneError`
// is returned.
func (p *Progress) Write(b []byte) (int, error) {
type result struct {
n int
err error
}
ch := make(chan result)
select {
case p.interceptIO <- func(w io.Writer) {
n, err := w.Write(b)
ch <- result{n, err}
}:
res := <-ch
return res.n, res.err
case <-p.done:
return 0, DoneError
}
}
// Wait waits for all bars to complete and finally shutdowns container. After
// this method has been called, there is no way to reuse (*Progress) instance.
func (p *Progress) Wait() {
// wait for user wg, if any
if p.uwg != nil {
p.uwg.Wait()
}
p.bwg.Wait()
p.Shutdown()
}
// Shutdown cancels any running bar immediately and then shutdowns (*Progress)
// instance. Normally this method shouldn't be called unless you know what you
// are doing. Proper way to shutdown is to call (*Progress).Wait() instead.
func (p *Progress) Shutdown() {
p.cancel()
p.pwg.Wait()
}
func (p *Progress) serve(s *pState, cw *cwriter.Writer) {
defer p.pwg.Done()
render := func() error { return s.render(cw) }
var err error
for {
select {
case op := <-p.operateState:
op(s)
case fn := <-p.interceptIO:
fn(cw)
case <-s.renderReq:
e := render()
if e != nil {
p.cancel() // cancel all bars
render = func() error { return nil }
err = e
}
case <-p.done:
update := make(chan bool)
for s.autoRefresh && err == nil {
s.hm.state(update)
if <-update {
err = render()
} else {
break
}
}
if err != nil {
_, _ = fmt.Fprintln(s.debugOut, err.Error())
}
s.hm.end(s.shutdownNotifier)
return
}
}
}
func (s pState) autoRefreshListener(done chan struct{}) {
if s.delayRC != nil {
<-s.delayRC
}
ticker := time.NewTicker(s.refreshRate)
defer ticker.Stop()
for {
select {
case t := <-ticker.C:
s.renderReq <- t
case <-s.ctx.Done():
close(done)
return
}
}
}
func (s pState) manualRefreshListener(done chan struct{}) {
for {
select {
case x := <-s.manualRC:
if t, ok := x.(time.Time); ok {
s.renderReq <- t
} else {
s.renderReq <- time.Now()
}
case <-s.ctx.Done():
close(done)
return
}
}
}
func (s *pState) render(cw *cwriter.Writer) (err error) {
s.hm.sync(s.dropS)
iter := make(chan *Bar)
go s.hm.iter(iter, s.dropS)
var width, height int
if cw.IsTerminal() {
width, height, err = cw.GetTermSize()
if err != nil {
close(s.dropS)
return err
}
} else {
if s.reqWidth > 0 {
width = s.reqWidth
} else {
width = 100
}
height = 100
}
for b := range iter {
go b.render(width)
}
return s.flush(cw, height)
}
func (s *pState) flush(cw *cwriter.Writer, height int) error {
var wg sync.WaitGroup
defer wg.Wait() // waiting for all s.hm.push to complete
var popCount int
var rows []io.Reader
iter := make(chan *Bar)
s.hm.drain(iter, s.dropD)
for b := range iter {
frame := <-b.frameCh
if frame.err != nil {
close(s.dropD)
b.cancel()
return frame.err // b.frameCh is buffered it's ok to return here
}
var usedRows int
for i := len(frame.rows) - 1; i >= 0; i-- {
if row := frame.rows[i]; len(rows) < height {
rows = append(rows, row)
usedRows++
} else {
_, _ = io.Copy(io.Discard, row)
}
}
if frame.shutdown != 0 && !frame.done {
if qb, ok := s.queueBars[b]; ok {
b.cancel()
delete(s.queueBars, b)
qb.priority = b.priority
wg.Add(1)
go func(b *Bar) {
s.hm.push(b, true)
wg.Done()
}(qb)
continue
}
if s.popCompleted && !frame.noPop {
switch frame.shutdown {
case 1:
b.priority = s.popPriority
s.popPriority++
default:
b.cancel()
popCount += usedRows
continue
}
} else if frame.rmOnComplete {
b.cancel()
continue
} else {
b.cancel()
}
}
wg.Add(1)
go func(b *Bar) {
s.hm.push(b, false)
wg.Done()
}(b)
}
for i := len(rows) - 1; i >= 0; i-- {
_, err := cw.ReadFrom(rows[i])
if err != nil {
return err
}
}
return cw.Flush(len(rows) - popCount)
}
func (s pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
bs := &bState{
id: s.idCount,
priority: s.idCount,
reqWidth: s.reqWidth,
total: total,
filler: filler,
renderReq: s.renderReq,
autoRefresh: s.autoRefresh,
}
if total > 0 {
bs.triggerComplete = true
}
for _, opt := range options {
if opt != nil {
opt(bs)
}
}
for i := 0; i < len(bs.buffers); i++ {
bs.buffers[i] = bytes.NewBuffer(make([]byte, 0, 512))
}
return bs
}