Merge pull request #24962 from containers/renovate/github.com-vbauerster-mpb-v8-8.x

fix(deps): update module github.com/vbauerster/mpb/v8 to v8.9.1
This commit is contained in:
openshift-merge-bot[bot] 2025-01-08 11:06:16 +00:00 committed by GitHub
commit 2043fa385f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 71 additions and 68 deletions

2
go.mod
View File

@ -68,7 +68,7 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
github.com/vbauerster/mpb/v8 v8.8.3
github.com/vbauerster/mpb/v8 v8.9.1
github.com/vishvananda/netlink v1.3.0
go.etcd.io/bbolt v1.3.11
golang.org/x/crypto v0.31.0

4
go.sum
View File

@ -515,8 +515,8 @@ github.com/ulikunitz/xz v0.5.12 h1:37Nm15o69RwBkXM0J6A5OlE67RZTfzUxTj8fB3dfcsc=
github.com/ulikunitz/xz v0.5.12/go.mod h1:nbz6k7qbPmH4IRqmfOplQw/tblSgqTqBwxkY0oWt/14=
github.com/vbatts/tar-split v0.11.6 h1:4SjTW5+PU11n6fZenf2IPoV8/tz3AaYHMWjf23envGs=
github.com/vbatts/tar-split v0.11.6/go.mod h1:dqKNtesIOr2j2Qv3W/cHjnvk9I8+G7oAkFDFN6TCBEI=
github.com/vbauerster/mpb/v8 v8.8.3 h1:dTOByGoqwaTJYPubhVz3lO5O6MK553XVgUo33LdnNsQ=
github.com/vbauerster/mpb/v8 v8.8.3/go.mod h1:JfCCrtcMsJwP6ZwMn9e5LMnNyp3TVNpUWWkN+nd4EWk=
github.com/vbauerster/mpb/v8 v8.9.1 h1:LH5R3lXPfE2e3lIGxN7WNWv3Hl5nWO6LRi2B0L0ERHw=
github.com/vbauerster/mpb/v8 v8.9.1/go.mod h1:4XMvznPh8nfe2NpnDo1QTPvW9MVkUhbG90mPWvmOzcQ=
github.com/vishvananda/netlink v1.3.0 h1:X7l42GfcV4S6E4vHTsw48qbrV+9PVojNfIhZcwQdrZk=
github.com/vishvananda/netlink v1.3.0/go.mod h1:i6NetklAujEcC6fK0JPjT8qSwWyO0HLn4UKG+hGqeJs=
github.com/vishvananda/netns v0.0.4 h1:Oeaw1EM2JMxD51g9uhtC0D7erkIjgmj8+JZc26m1YX8=

View File

@ -30,6 +30,17 @@ func WithWidth(width int) ContainerOption {
}
}
// WithQueueLen sets buffer size of heap manager channel. Ideally it must be
// kept at MAX value, where MAX is number of bars to be rendered at the same
// time. If len < MAX then backpressure to the scheduler will be increased as
// MAX-len extra goroutines will be launched at each render cycle.
// Default queue len is 128.
func WithQueueLen(len int) ContainerOption {
return func(s *pState) {
s.hmQueueLen = len
}
}
// WithRefreshRate overrides default 150ms refresh rate.
func WithRefreshRate(d time.Duration) ContainerOption {
return func(s *pState) {

View File

@ -10,7 +10,6 @@ const (
h_sync heapCmd = iota
h_push
h_iter
h_drain
h_fix
h_state
h_end
@ -22,8 +21,9 @@ type heapRequest struct {
}
type iterData struct {
iter chan<- *Bar
drop <-chan struct{}
drop <-chan struct{}
iter chan<- *Bar
iterPop chan<- *Bar
}
type pushData struct {
@ -41,7 +41,7 @@ func (m heapManager) run() {
var bHeap priorityQueue
var pMatrix, aMatrix map[int][]chan int
var l int
var len int
var sync bool
for req := range m {
@ -49,11 +49,9 @@ func (m heapManager) run() {
case h_push:
data := req.data.(pushData)
heap.Push(&bHeap, data.bar)
if !sync {
sync = data.sync
}
sync = sync || data.sync
case h_sync:
if sync || l != bHeap.Len() {
if sync || len != bHeap.Len() {
pMatrix = make(map[int][]chan int)
aMatrix = make(map[int][]chan int)
for _, b := range bHeap {
@ -66,33 +64,37 @@ func (m heapManager) run() {
}
}
sync = false
l = bHeap.Len()
len = bHeap.Len()
}
drop := req.data.(<-chan struct{})
syncWidth(pMatrix, drop)
syncWidth(aMatrix, drop)
case h_iter:
data := req.data.(iterData)
drop_iter:
loop: // unordered iteration
for _, b := range bHeap {
select {
case data.iter <- b:
case <-data.drop:
break drop_iter
data.iterPop = nil
break loop
}
}
close(data.iter)
case h_drain:
data := req.data.(iterData)
drop_drain:
if data.iterPop == nil {
break
}
loop_pop: // ordered iteration
for bHeap.Len() != 0 {
bar := heap.Pop(&bHeap).(*Bar)
select {
case data.iter <- heap.Pop(&bHeap).(*Bar):
case data.iterPop <- bar:
case <-data.drop:
break drop_drain
heap.Push(&bHeap, bar)
break loop_pop
}
}
close(data.iter)
close(data.iterPop)
case h_fix:
data := req.data.(fixData)
if data.bar.index < 0 {
@ -104,7 +106,7 @@ func (m heapManager) run() {
}
case h_state:
ch := req.data.(chan<- bool)
ch <- sync || l != bHeap.Len()
ch <- sync || len != bHeap.Len()
case h_end:
ch := req.data.(chan<- interface{})
if ch != nil {
@ -123,19 +125,21 @@ func (m heapManager) sync(drop <-chan struct{}) {
func (m heapManager) push(b *Bar, sync bool) {
data := pushData{b, sync}
m <- heapRequest{cmd: h_push, data: data}
req := heapRequest{cmd: h_push, data: data}
select {
case m <- req:
default:
go func() {
m <- req
}()
}
}
func (m heapManager) iter(iter chan<- *Bar, drop <-chan struct{}) {
data := iterData{iter, drop}
func (m heapManager) iter(drop <-chan struct{}, iter, iterPop chan<- *Bar) {
data := iterData{drop, iter, iterPop}
m <- heapRequest{cmd: h_iter, data: data}
}
func (m heapManager) drain(iter chan<- *Bar, drop <-chan struct{}) {
data := iterData{iter, drop}
m <- heapRequest{cmd: h_drain, data: data}
}
func (m heapManager) fix(b *Bar, priority int, lazy bool) {
data := fixData{b, priority, lazy}
m <- heapRequest{cmd: h_fix, data: data}

View File

@ -15,6 +15,7 @@ import (
)
const defaultRefreshRate = 150 * time.Millisecond
const defaultHmQueueLength = 128
// DoneError represents use after `(*Progress).Wait()` error.
var DoneError = fmt.Errorf("%T instance can't be reused after %[1]T.Wait()", (*Progress)(nil))
@ -31,16 +32,17 @@ type Progress struct {
// pState holds bars in its priorityQueue, it gets passed to (*Progress).serve monitor goroutine.
type pState struct {
ctx context.Context
hm heapManager
dropS, dropD chan struct{}
renderReq chan time.Time
idCount int
popPriority int
ctx context.Context
hm heapManager
iterDrop chan struct{}
renderReq chan time.Time
idCount int
popPriority int
// following are provided/overrided by user
refreshRate time.Duration
hmQueueLen int
reqWidth int
refreshRate time.Duration
popCompleted bool
autoRefresh bool
delayRC <-chan struct{}
@ -68,9 +70,8 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
ctx, cancel := context.WithCancel(ctx)
s := &pState{
ctx: ctx,
hm: make(heapManager),
dropS: make(chan struct{}),
dropD: make(chan struct{}),
hmQueueLen: defaultHmQueueLength,
iterDrop: make(chan struct{}),
renderReq: make(chan time.Time),
popPriority: math.MinInt32,
refreshRate: defaultRefreshRate,
@ -85,6 +86,8 @@ func NewWithContext(ctx context.Context, options ...ContainerOption) *Progress {
}
}
s.hm = make(heapManager, s.hmQueueLen)
p := &Progress{
uwg: s.uwg,
operateState: make(chan func(*pState)),
@ -173,9 +176,9 @@ func (p *Progress) Add(total int64, filler BarFiller, options ...BarOption) (*Ba
}
func (p *Progress) traverseBars(cb func(b *Bar) bool) {
iter, drop := make(chan *Bar), make(chan struct{})
drop, iter := make(chan struct{}), make(chan *Bar)
select {
case p.operateState <- func(s *pState) { s.hm.iter(iter, drop) }:
case p.operateState <- func(s *pState) { s.hm.iter(drop, iter, nil) }:
for b := range iter {
if !cb(b) {
close(drop)
@ -333,15 +336,15 @@ func (s *pState) manualRefreshListener(done chan struct{}) {
}
func (s *pState) render(cw *cwriter.Writer) (err error) {
s.hm.sync(s.dropS)
iter := make(chan *Bar)
go s.hm.iter(iter, s.dropS)
iter, iterPop := make(chan *Bar), make(chan *Bar)
s.hm.sync(s.iterDrop)
s.hm.iter(s.iterDrop, iter, iterPop)
var width, height int
if cw.IsTerminal() {
width, height, err = cw.GetTermSize()
if err != nil {
close(s.dropS)
close(s.iterDrop)
return err
}
} else {
@ -357,23 +360,17 @@ func (s *pState) render(cw *cwriter.Writer) (err error) {
go b.render(width)
}
return s.flush(cw, height)
return s.flush(cw, height, iterPop)
}
func (s *pState) flush(cw *cwriter.Writer, height int) error {
var wg sync.WaitGroup
defer wg.Wait() // waiting for all s.push to complete
func (s *pState) flush(cw *cwriter.Writer, height int, iter <-chan *Bar) error {
var popCount int
var rows []io.Reader
iter := make(chan *Bar)
s.hm.drain(iter, s.dropD)
for b := range iter {
frame := <-b.frameCh
if frame.err != nil {
close(s.dropD)
close(s.iterDrop)
b.cancel()
return frame.err // b.frameCh is buffered it's ok to return here
}
@ -393,16 +390,13 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error {
if qb, ok := s.queueBars[b]; ok {
delete(s.queueBars, b)
qb.priority = b.priority
wg.Add(1)
go s.push(&wg, qb, true)
s.hm.push(qb, true)
} else if s.popCompleted && !frame.noPop {
b.priority = s.popPriority
s.popPriority++
wg.Add(1)
go s.push(&wg, b, false)
s.hm.push(b, false)
} else if !frame.rmOnComplete {
wg.Add(1)
go s.push(&wg, b, false)
s.hm.push(b, false)
}
case 2:
if s.popCompleted && !frame.noPop {
@ -411,8 +405,7 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error {
}
fallthrough
default:
wg.Add(1)
go s.push(&wg, b, false)
s.hm.push(b, false)
}
}
@ -426,11 +419,6 @@ func (s *pState) flush(cw *cwriter.Writer, height int) error {
return cw.Flush(len(rows) - popCount)
}
func (s *pState) push(wg *sync.WaitGroup, b *Bar, sync bool) {
s.hm.push(b, sync)
wg.Done()
}
func (s pState) makeBarState(total int64, filler BarFiller, options ...BarOption) *bState {
bs := &bState{
id: s.idCount,

2
vendor/modules.txt vendored
View File

@ -1107,7 +1107,7 @@ github.com/ulikunitz/xz/lzma
github.com/vbatts/tar-split/archive/tar
github.com/vbatts/tar-split/tar/asm
github.com/vbatts/tar-split/tar/storage
# github.com/vbauerster/mpb/v8 v8.8.3
# github.com/vbauerster/mpb/v8 v8.9.1
## explicit; go 1.17
github.com/vbauerster/mpb/v8
github.com/vbauerster/mpb/v8/cwriter