client-go/util/async/core.go

84 lines
2.4 KiB
Go

// Copyright 2025 TiKV Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package async
import (
"sync"
)
// Pool is an interface for goroutine pool.
type Pool interface {
// Go submits a function to the goroutine pool.
Go(f func())
}
// Executor is an interface that can append functions to be executed asynchronously.
type Executor interface {
Pool
// Append adds functions to the executor. It should be safe to call Append concurrently.
Append(fs ...func())
}
// Callback defines a callback function that can be invoked immediately or later.
type Callback[T any] interface {
// Executor returns the executor that the callback uses.
Executor() Executor
// Inject adds a deferred action that will be invoked before the callback.
Inject(g func(T, error) (T, error))
// Invoke invokes the callback immediately in current goroutine.
Invoke(val T, err error)
// Schedule schedules the callback to be invoked later, it's typically called in other goroutines.
Schedule(val T, err error)
}
// NewCallback creates a new callback function.
func NewCallback[T any](e Executor, f func(T, error)) Callback[T] {
return &callback[T]{e: e, f: f}
}
type callback[T any] struct {
once sync.Once
e Executor
f func(T, error)
gs []func(T, error) (T, error)
}
// Executor implements Callback.
func (cb *callback[T]) Executor() Executor {
return cb.e
}
// Inject implements Callback.
func (cb *callback[T]) Inject(g func(T, error) (T, error)) {
cb.gs = append(cb.gs, g)
}
// Invoke implements Callback.
func (cb *callback[T]) Invoke(val T, err error) {
cb.once.Do(func() { cb.call(val, err) })
}
// Schedule implements Callback.
func (cb *callback[T]) Schedule(val T, err error) {
cb.once.Do(func() { cb.e.Append(func() { cb.call(val, err) }) })
}
func (cb *callback[T]) call(val T, err error) {
for i := len(cb.gs) - 1; i >= 0; i-- {
val, err = cb.gs[i](val, err)
}
cb.f(val, err)
}