Merge pull request #4080 from ctripcloud/remove-max-retry

remove unnecessary maxRetry for AsyncWorker
This commit is contained in:
karmada-bot 2023-10-10 17:29:19 +08:00 committed by GitHub
commit 70e2e1ca71
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 7 additions and 65 deletions

2
go.mod
View File

@ -23,7 +23,6 @@ require (
github.com/stretchr/testify v1.8.3
github.com/vektra/mockery/v2 v2.10.0
github.com/yuin/gopher-lua v0.0.0-20220504180219-658193537a64
go.uber.org/atomic v1.9.0
golang.org/x/net v0.12.0
golang.org/x/term v0.10.0
golang.org/x/text v0.11.0
@ -163,6 +162,7 @@ require (
go.opentelemetry.io/otel/trace v1.10.0 // indirect
go.opentelemetry.io/proto/otlp v0.19.0 // indirect
go.starlark.net v0.0.0-20200306205701-8dd3e2ee1dd5 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/multierr v1.8.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.11.0 // indirect

View File

@ -12,15 +12,6 @@ import (
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
)
const (
// maxRetries is the number of times a resource will be retried before it is dropped out of the queue.
// With the current rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers represent the times
// a resource is going to be re-queued:
//
// 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, 10.2s, 20.4s, 41s, 82s
maxRetries = 15
)
// AsyncWorker maintains a rate limiting queue and the items in the queue will be reconciled by a "ReconcileFunc".
// The item will be re-queued if "ReconcileFunc" returns an error, maximum re-queue times defined by "maxRetries" above,
// after that the item will be discarded from the queue.
@ -113,21 +104,6 @@ func (w *asyncWorker) AddAfter(item interface{}, duration time.Duration) {
w.queue.AddAfter(item, duration)
}
func (w *asyncWorker) handleError(err error, key interface{}) {
if err == nil {
w.queue.Forget(key)
return
}
if w.queue.NumRequeues(key) < maxRetries {
w.queue.AddRateLimited(key)
return
}
klog.V(2).Infof("Dropping resource %q out of the queue: %v", key, err)
w.queue.Forget(key)
}
func (w *asyncWorker) worker() {
key, quit := w.queue.Get()
if quit {
@ -136,7 +112,12 @@ func (w *asyncWorker) worker() {
defer w.queue.Done(key)
err := w.reconcileFunc(key)
w.handleError(err, key)
if err != nil {
w.queue.AddRateLimited(key)
return
}
w.queue.Forget(key)
}
func (w *asyncWorker) Run(workerNumber int, stopChan <-chan struct{}) {

View File

@ -7,7 +7,6 @@ import (
"testing"
"time"
"go.uber.org/atomic"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -147,44 +146,6 @@ func Test_asyncWorker_Run(t *testing.T) {
}
}
type asyncWorkerReconciler2 struct {
receivedTimes atomic.Int64
}
func (a *asyncWorkerReconciler2) ReconcileFunc(_ QueueKey) error {
a.receivedTimes.Inc()
return errors.New("always fail")
}
func Test_asyncWorker_drop_resource(t *testing.T) {
const name = "fake_node"
const wantReceivedTimes = maxRetries + 1
reconcile := new(asyncWorkerReconciler2)
worker := newTestAsyncWorker(reconcile.ReconcileFunc)
stopChan := make(chan struct{})
defer close(stopChan)
worker.Run(5, stopChan)
worker.Add(name)
err := assertUntil(20*time.Second, func() error {
receivedTimes := reconcile.receivedTimes.Load()
if receivedTimes != wantReceivedTimes {
return fmt.Errorf("receivedTimes = %v, want = %v", receivedTimes, wantReceivedTimes)
}
return nil
})
if err != nil {
t.Error(err.Error())
}
}
// Block running assertion func periodically to check if condition match.
// Fail if: maxDuration is reached
// Success if: assertion return nil error