client-go/util/async/runloop.go

159 lines
3.9 KiB
Go

// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package async
import (
"context"
"errors"
"sync"
)
// State represents the state of a run loop.
type State uint32
const (
StateIdle State = iota
StateWaiting
StateRunning
)
// RunLoop implements the Executor interface.
type RunLoop struct {
Pool
lock sync.Mutex
ready chan struct{}
runnable []func()
running []func()
state State
}
// NewRunLoop creates a new run-loop.
func NewRunLoop() *RunLoop {
return &RunLoop{ready: make(chan struct{})}
}
// Go submits f to the pool when possible (pool is not nil), otherwise starts a new goroutine for f.
func (l *RunLoop) Go(f func()) {
if l.Pool == nil {
go f()
} else {
l.Pool.Go(f)
}
}
// State returns the current state of the run-loop.
func (l *RunLoop) State() State {
l.lock.Lock()
state := l.state
l.lock.Unlock()
return state
}
// NumRunnable returns the number of runnable tasks in the run-loop currently.
func (l *RunLoop) NumRunnable() int {
l.lock.Lock()
n := len(l.runnable)
l.lock.Unlock()
return n
}
// Append implements the Executor interface. It's safe to call Append concurrently.
func (l *RunLoop) Append(fs ...func()) {
if len(fs) == 0 {
return
}
notify := false
l.lock.Lock()
l.runnable = append(l.runnable, fs...)
if l.state == StateWaiting {
l.state = StateIdle // waiting -> idle
notify = true
}
l.lock.Unlock()
if notify {
l.ready <- struct{}{}
}
}
// Exec drives the run-loop to execute all runnable tasks and returns the number of tasks executed. If the context is
// done before all tasks are executed, it returns the number of tasks executed and the context error. Exec turns the
// run-loop to running or waiting state during process, and finally to idle state on return. When calling Exec without
// pending runnables, the run-loop turns to waiting, in which case one should make sure that Append will be called in
// the other goroutine to wake it up later, or the context will be canceled finally to break the waiting. Exec should
// only be called by one goroutine.
func (l *RunLoop) Exec(ctx context.Context) (int, error) {
for {
l.lock.Lock()
if l.state != StateIdle {
l.lock.Unlock()
return 0, errors.New("runloop: already executing")
}
// assert l.state == stateIdle
if len(l.runnable) == 0 {
l.state = StateWaiting // idle -> waiting
l.lock.Unlock()
select {
case <-l.ready:
continue
case <-ctx.Done():
l.lock.Lock()
l.state = StateIdle // waiting -> idle
l.lock.Unlock()
return 0, ctx.Err()
}
} else {
l.running, l.runnable = l.runnable, l.running[:0]
l.state = StateRunning // idle -> running
l.lock.Unlock()
return l.run(ctx)
}
}
}
func (l *RunLoop) run(ctx context.Context) (int, error) {
count := 0
for {
for i, f := range l.running {
select {
case <-ctx.Done():
l.lock.Lock()
// move remaining running tasks to runnable
l.running = append(l.running[:0], l.running[i:]...)
l.running = append(l.running, l.runnable...)
l.running, l.runnable = l.runnable, l.running
l.state = StateIdle // running -> idle
l.lock.Unlock()
return count, ctx.Err()
default:
f()
count++
}
}
l.lock.Lock()
if len(l.runnable) == 0 {
l.state = StateIdle // running -> idle
l.lock.Unlock()
return count, nil
}
l.running, l.runnable = l.runnable, l.running[:0]
l.lock.Unlock()
}
}