Merge pull request #103830 from MikeSpreitzer/new-event-clock
Introduce event clocks based on k8s.io/utils/clock Kubernetes-commit: 5a92b78dd299b29aca6f7408b187532cb8876852
This commit is contained in:
commit
6b27cd1081
4
go.mod
4
go.mod
|
|
@ -43,7 +43,7 @@ require (
|
|||
google.golang.org/grpc v1.38.0
|
||||
gopkg.in/natefinch/lumberjack.v2 v2.0.0
|
||||
gopkg.in/square/go-jose.v2 v2.2.2
|
||||
k8s.io/api v0.0.0-20210805120319-7a35d40d43ac
|
||||
k8s.io/api v0.0.0-20210806000318-c02a408946ed
|
||||
k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1
|
||||
k8s.io/client-go v0.0.0-20210805080552-fa98c048508e
|
||||
k8s.io/component-base v0.0.0-20210805120716-92bebfd2c985
|
||||
|
|
@ -56,7 +56,7 @@ require (
|
|||
)
|
||||
|
||||
replace (
|
||||
k8s.io/api => k8s.io/api v0.0.0-20210805120319-7a35d40d43ac
|
||||
k8s.io/api => k8s.io/api v0.0.0-20210806000318-c02a408946ed
|
||||
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1
|
||||
k8s.io/client-go => k8s.io/client-go v0.0.0-20210805080552-fa98c048508e
|
||||
k8s.io/component-base => k8s.io/component-base v0.0.0-20210805120716-92bebfd2c985
|
||||
|
|
|
|||
4
go.sum
4
go.sum
|
|
@ -780,8 +780,8 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
|
|||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
|
||||
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
|
||||
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
|
||||
k8s.io/api v0.0.0-20210805120319-7a35d40d43ac h1:Wu1nE5Lz+WstOMn+yT1unlEh/w2eXTQ+cWeD8m/YQYE=
|
||||
k8s.io/api v0.0.0-20210805120319-7a35d40d43ac/go.mod h1:9J6nkHavSazyXmPeuA4f1YO9Ztdjw7nDibPjT4P+wsY=
|
||||
k8s.io/api v0.0.0-20210806000318-c02a408946ed h1:U6TxkwwwxzupFa00nt11QxtHhIeSlZ9QpSEfjQLJmr8=
|
||||
k8s.io/api v0.0.0-20210806000318-c02a408946ed/go.mod h1:9J6nkHavSazyXmPeuA4f1YO9Ztdjw7nDibPjT4P+wsY=
|
||||
k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1 h1:cVpwhaGeh/tNPBeYbFff3tjx5AxwG5zwImhz+eusG3k=
|
||||
k8s.io/apimachinery v0.0.0-20210805051055-f7769293e6f1/go.mod h1:O3oNtNadZdeOMxHFVxOreoznohCpy0z6mocxbZr7oJ0=
|
||||
k8s.io/client-go v0.0.0-20210805080552-fa98c048508e h1:0RaerCWCkguHMpXmdiPlW+wLFkV2oojh4F/QwThEqtI=
|
||||
|
|
|
|||
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
baseclock "k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// EventFunc does some work that needs to be done at or after the
|
||||
// given time.
|
||||
type EventFunc func(time.Time)
|
||||
|
||||
// EventClock is an active clock abstraction for use in code that is
|
||||
// testable with a fake clock that itself determines how time may be
|
||||
// advanced. The timing paradigm is invoking EventFuncs rather than
|
||||
// synchronizing through channels, so that the fake clock has a handle
|
||||
// on when associated activity is done.
|
||||
type EventClock interface {
|
||||
baseclock.PassiveClock
|
||||
|
||||
// Sleep returns after the given duration (or more).
|
||||
Sleep(d time.Duration)
|
||||
|
||||
// EventAfterDuration invokes the given EventFunc after the given duration (or more),
|
||||
// passing the time when the invocation was launched.
|
||||
EventAfterDuration(f EventFunc, d time.Duration)
|
||||
|
||||
// EventAfterTime invokes the given EventFunc at the given time or later,
|
||||
// passing the time when the invocation was launched.
|
||||
EventAfterTime(f EventFunc, t time.Time)
|
||||
}
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/utils/clock"
|
||||
)
|
||||
|
||||
// RealEventClock fires event on real world time
|
||||
type RealEventClock struct {
|
||||
clock.RealClock
|
||||
}
|
||||
|
||||
// EventAfterDuration schedules an EventFunc
|
||||
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
ch := time.After(d)
|
||||
go func() {
|
||||
t := <-ch
|
||||
f(t)
|
||||
}()
|
||||
}
|
||||
|
||||
// EventAfterTime schedules an EventFunc
|
||||
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||
r.EventAfterDuration(f, time.Until(t))
|
||||
}
|
||||
|
|
@ -0,0 +1,68 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestRealEventClock(t *testing.T) {
|
||||
ec := RealEventClock{}
|
||||
var numDone int32
|
||||
now := ec.Now()
|
||||
const batchSize = 100
|
||||
times := make(chan time.Time, batchSize+1)
|
||||
try := func(abs bool, d time.Duration) {
|
||||
f := func(u time.Time) {
|
||||
realD := ec.Since(now)
|
||||
atomic.AddInt32(&numDone, 1)
|
||||
times <- u
|
||||
if realD < d {
|
||||
t.Errorf("Asked for %v, got %v", d, realD)
|
||||
}
|
||||
}
|
||||
if abs {
|
||||
ec.EventAfterTime(f, now.Add(d))
|
||||
} else {
|
||||
ec.EventAfterDuration(f, d)
|
||||
}
|
||||
}
|
||||
try(true, time.Millisecond*3300)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
|
||||
try(i%2 == 0, d)
|
||||
}
|
||||
time.Sleep(time.Second * 4)
|
||||
if atomic.LoadInt32(&numDone) != batchSize+1 {
|
||||
t.Errorf("Got only %v events", numDone)
|
||||
}
|
||||
lastTime := now
|
||||
for i := 0; i <= batchSize; i++ {
|
||||
nextTime := <-times
|
||||
if nextTime.Before(now) {
|
||||
continue
|
||||
}
|
||||
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
|
||||
if dt < 0 {
|
||||
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||
}
|
||||
lastTime = nextTime
|
||||
}
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
package testing
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
|
|
@ -24,50 +24,14 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
baseclocktest "k8s.io/utils/clock/testing"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
// EventFunc does some work that needs to be done at or after the
|
||||
// given time. After this function returns, associated work may continue
|
||||
// on other goroutines only if they are counted by the GoRoutineCounter
|
||||
// of the FakeEventClock handling this EventFunc.
|
||||
type EventFunc func(time.Time)
|
||||
|
||||
// EventClock fires event on time
|
||||
type EventClock interface {
|
||||
clock.PassiveClock
|
||||
EventAfterDuration(f EventFunc, d time.Duration)
|
||||
EventAfterTime(f EventFunc, t time.Time)
|
||||
}
|
||||
|
||||
// RealEventClock fires event on real world time
|
||||
type RealEventClock struct {
|
||||
clock.RealClock
|
||||
}
|
||||
|
||||
// EventAfterDuration schedules an EventFunc
|
||||
func (RealEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
ch := time.After(d)
|
||||
go func() {
|
||||
t := <-ch
|
||||
f(t)
|
||||
}()
|
||||
}
|
||||
|
||||
// EventAfterTime schedules an EventFunc
|
||||
func (r RealEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||
now := time.Now()
|
||||
d := t.Sub(now)
|
||||
if d <= 0 {
|
||||
go f(now)
|
||||
} else {
|
||||
r.EventAfterDuration(f, d)
|
||||
}
|
||||
}
|
||||
|
||||
// waitGroupCounter is a wait group used for a GoRoutine Counter. This private
|
||||
// waitGroupCounter is a wait group used for a GoRoutineCounter. This private
|
||||
// type is used to disallow direct waitGroup access
|
||||
type waitGroupCounter struct {
|
||||
wg sync.WaitGroup
|
||||
|
|
@ -102,9 +66,20 @@ func (wgc *waitGroupCounter) Wait() {
|
|||
}
|
||||
|
||||
// FakeEventClock is one whose time does not pass implicitly but
|
||||
// rather is explicitly set by invocations of its SetTime method
|
||||
// rather is explicitly set by invocations of its SetTime method.
|
||||
// Each FakeEventClock has an associated GoRoutineCounter that is
|
||||
// used to track associated activity.
|
||||
// For the EventAfterDuration and EventAfterTime methods,
|
||||
// the clock itself counts the start and stop of the EventFunc
|
||||
// and the client is responsible for counting any suspend and
|
||||
// resume internal to the EventFunc.
|
||||
// The Sleep method must only be invoked from a goroutine that is
|
||||
// counted in that GoRoutineCounter.
|
||||
// The SetTime method does not return until all the triggered
|
||||
// EventFuncs return. Consequently, an EventFunc given to a method
|
||||
// of this clock must not wait for this clock to advance.
|
||||
type FakeEventClock struct {
|
||||
clock.FakePassiveClock
|
||||
baseclocktest.FakePassiveClock
|
||||
|
||||
// waiters is a heap of waiting work, sorted by time
|
||||
waiters eventWaiterHeap
|
||||
|
|
@ -131,7 +106,7 @@ var _ heap.Interface = (*eventWaiterHeap)(nil)
|
|||
|
||||
type eventWaiter struct {
|
||||
targetTime time.Time
|
||||
f EventFunc
|
||||
f clock.EventFunc
|
||||
}
|
||||
|
||||
// NewFakeEventClock constructor. The given `r *rand.Rand` must
|
||||
|
|
@ -149,7 +124,7 @@ func NewFakeEventClock(t time.Time, fuzz time.Duration, r *rand.Rand) (*FakeEven
|
|||
r.Uint64()
|
||||
}
|
||||
return &FakeEventClock{
|
||||
FakePassiveClock: *clock.NewFakePassiveClock(t),
|
||||
FakePassiveClock: *baseclocktest.NewFakePassiveClock(t),
|
||||
clientWG: grc,
|
||||
fuzz: fuzz,
|
||||
rand: r,
|
||||
|
|
@ -169,8 +144,9 @@ func (fec *FakeEventClock) GetNextTime() (time.Time, bool) {
|
|||
|
||||
// Run runs all the events scheduled, and all the events they
|
||||
// schedule, and so on, until there are none scheduled or the limit is not
|
||||
// nil and the next time would exceed the limit. The clientWG given in
|
||||
// the constructor gates each advance of time.
|
||||
// nil and the next time would exceed the limit. The associated
|
||||
// GoRoutineCounter gates the advancing of time. That is,
|
||||
// time is not advanced until all the associated work is finished.
|
||||
func (fec *FakeEventClock) Run(limit *time.Time) {
|
||||
for {
|
||||
fec.clientWG.Wait()
|
||||
|
|
@ -200,7 +176,7 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
|
|||
for len(fec.waiters) > 0 && !now.Before(fec.waiters[0].targetTime) {
|
||||
ew := heap.Pop(&fec.waiters).(eventWaiter)
|
||||
wg.Add(1)
|
||||
go func(f EventFunc) { f(now); wg.Done() }(ew.f)
|
||||
go func(f clock.EventFunc) { f(now); wg.Done() }(ew.f)
|
||||
foundSome = true
|
||||
}
|
||||
wg.Wait()
|
||||
|
|
@ -211,9 +187,24 @@ func (fec *FakeEventClock) SetTime(t time.Time) {
|
|||
}
|
||||
}
|
||||
|
||||
// Sleep returns after the given duration has passed.
|
||||
// Sleep must only be invoked in a goroutine that is counted
|
||||
// in the FakeEventClock's associated GoRoutineCounter.
|
||||
// Unlike the base FakeClock's Sleep, this method does not itself advance the clock
|
||||
// but rather leaves that up to other actors (e.g., Run).
|
||||
func (fec *FakeEventClock) Sleep(duration time.Duration) {
|
||||
doneCh := make(chan struct{})
|
||||
fec.EventAfterDuration(func(time.Time) {
|
||||
fec.clientWG.Add(1)
|
||||
close(doneCh)
|
||||
}, duration)
|
||||
fec.clientWG.Add(-1)
|
||||
<-doneCh
|
||||
}
|
||||
|
||||
// EventAfterDuration schedules the given function to be invoked once
|
||||
// the given duration has passed.
|
||||
func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
||||
func (fec *FakeEventClock) EventAfterDuration(f clock.EventFunc, d time.Duration) {
|
||||
fec.waitersLock.Lock()
|
||||
defer fec.waitersLock.Unlock()
|
||||
now := fec.Now()
|
||||
|
|
@ -223,7 +214,7 @@ func (fec *FakeEventClock) EventAfterDuration(f EventFunc, d time.Duration) {
|
|||
|
||||
// EventAfterTime schedules the given function to be invoked once
|
||||
// the given time has arrived.
|
||||
func (fec *FakeEventClock) EventAfterTime(f EventFunc, t time.Time) {
|
||||
func (fec *FakeEventClock) EventAfterTime(f clock.EventFunc, t time.Time) {
|
||||
fec.waitersLock.Lock()
|
||||
defer fec.waitersLock.Unlock()
|
||||
fd := time.Duration(float32(fec.fuzz) * fec.rand.Float32())
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
Copyright 2019 The Kubernetes Authors.
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
|
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||
limitations under the License.
|
||||
*/
|
||||
|
||||
package clock
|
||||
package testing
|
||||
|
||||
import (
|
||||
"math/rand"
|
||||
|
|
@ -22,23 +22,17 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock"
|
||||
)
|
||||
|
||||
type TestableEventClock interface {
|
||||
EventClock
|
||||
clock.EventClock
|
||||
SetTime(time.Time)
|
||||
Run(*time.Time)
|
||||
}
|
||||
|
||||
// settablePassiveClock allows setting current time of a passive clock
|
||||
type settablePassiveClock interface {
|
||||
clock.PassiveClock
|
||||
SetTime(time.Time)
|
||||
}
|
||||
|
||||
func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.Duration) {
|
||||
exercisePassiveClock(t, ec)
|
||||
exerciseSettablePassiveClock(t, ec)
|
||||
var numDone int32
|
||||
now := ec.Now()
|
||||
strictable := true
|
||||
|
|
@ -104,7 +98,8 @@ func exerciseTestableEventClock(t *testing.T, ec TestableEventClock, fuzz time.D
|
|||
}
|
||||
}
|
||||
|
||||
func exercisePassiveClock(t *testing.T, pc settablePassiveClock) {
|
||||
// copied from baseclocktest, because it is not public
|
||||
func exerciseSettablePassiveClock(t *testing.T, pc TestableEventClock) {
|
||||
t1 := time.Now()
|
||||
t2 := t1.Add(time.Hour)
|
||||
pc.SetTime(t1)
|
||||
|
|
@ -134,50 +129,3 @@ func TestFakeEventClock(t *testing.T) {
|
|||
fec, _ = NewFakeEventClock(startTime, time.Second, nil)
|
||||
exerciseTestableEventClock(t, fec, time.Second)
|
||||
}
|
||||
|
||||
func exerciseEventClock(t *testing.T, ec EventClock, relax func(time.Duration)) {
|
||||
var numDone int32
|
||||
now := ec.Now()
|
||||
const batchSize = 100
|
||||
times := make(chan time.Time, batchSize+1)
|
||||
try := func(abs bool, d time.Duration) {
|
||||
f := func(u time.Time) {
|
||||
realD := ec.Since(now)
|
||||
atomic.AddInt32(&numDone, 1)
|
||||
times <- u
|
||||
if realD < d {
|
||||
t.Errorf("Asked for %v, got %v", d, realD)
|
||||
}
|
||||
}
|
||||
if abs {
|
||||
ec.EventAfterTime(f, now.Add(d))
|
||||
} else {
|
||||
ec.EventAfterDuration(f, d)
|
||||
}
|
||||
}
|
||||
try(true, time.Millisecond*3300)
|
||||
for i := 0; i < batchSize; i++ {
|
||||
d := time.Duration(rand.Intn(30)-3) * time.Millisecond * 100
|
||||
try(i%2 == 0, d)
|
||||
}
|
||||
relax(time.Second * 4)
|
||||
if atomic.LoadInt32(&numDone) != batchSize+1 {
|
||||
t.Errorf("Got only %v events", numDone)
|
||||
}
|
||||
lastTime := now
|
||||
for i := 0; i <= batchSize; i++ {
|
||||
nextTime := <-times
|
||||
if nextTime.Before(now) {
|
||||
continue
|
||||
}
|
||||
dt := nextTime.Sub(lastTime) / (50 * time.Millisecond)
|
||||
if dt < 0 {
|
||||
t.Errorf("Got %s after %s", nextTime, lastTime)
|
||||
}
|
||||
lastTime = nextTime
|
||||
}
|
||||
}
|
||||
|
||||
func TestRealEventClock(t *testing.T) {
|
||||
exerciseEventClock(t, RealEventClock{}, func(d time.Duration) { time.Sleep(d) })
|
||||
}
|
||||
|
|
@ -22,12 +22,12 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
|
||||
)
|
||||
|
||||
func TestLockingWriteOnce(t *testing.T) {
|
||||
now := time.Now()
|
||||
clock, counter := clock.NewFakeEventClock(now, 0, nil)
|
||||
clock, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
var lock sync.Mutex
|
||||
wr := NewWriteOnce(&lock, counter)
|
||||
var gots int32
|
||||
|
|
|
|||
|
|
@ -23,7 +23,8 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/debug"
|
||||
|
|
@ -33,6 +34,12 @@ import (
|
|||
fqrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||
"k8s.io/apiserver/pkg/util/shufflesharding"
|
||||
"k8s.io/klog/v2"
|
||||
|
||||
// The following hack is needed to work around a tooling deficiency.
|
||||
// Packages imported only for test code are not included in vendor.
|
||||
// See https://kubernetes.slack.com/archives/C0EG7JC6T/p1626985671458800?thread_ts=1626983387.450800&cid=C0EG7JC6T
|
||||
// The need for this hack will be removed when we make queueset use an EventClock rather than a PassiveClock.
|
||||
_ "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
const nsTimeFmt = "2006-01-02 15:04:05.000000000"
|
||||
|
|
@ -299,7 +306,7 @@ func (qs *queueSet) StartRequest(ctx context.Context, workEstimate *fqrequest.Wo
|
|||
go func() {
|
||||
defer runtime.HandleCrash()
|
||||
qs.goroutineDoneOrBlocked()
|
||||
_ = <-doneCh
|
||||
<-doneCh
|
||||
// Whatever goroutine unblocked the preceding receive MUST
|
||||
// have already either (a) incremented qs.counter or (b)
|
||||
// known that said counter is not actually counting or (c)
|
||||
|
|
|
|||
|
|
@ -27,11 +27,12 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/clock"
|
||||
"k8s.io/utils/clock"
|
||||
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/counter"
|
||||
fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing"
|
||||
fqclocktest "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/clock/testing"
|
||||
test "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing"
|
||||
testclock "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/testing/clock"
|
||||
"k8s.io/apiserver/pkg/util/flowcontrol/metrics"
|
||||
fcrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
|
||||
"k8s.io/klog/v2"
|
||||
|
|
@ -132,7 +133,7 @@ type uniformScenario struct {
|
|||
expectAllRequests bool
|
||||
evalInqueueMetrics, evalExecutingMetrics bool
|
||||
rejectReason string
|
||||
clk *testclock.FakeEventClock
|
||||
clk *fqclocktest.FakeEventClock
|
||||
counter counter.GoRoutineCounter
|
||||
}
|
||||
|
||||
|
|
@ -245,7 +246,7 @@ func (ust *uniformScenarioThread) callK(k int) {
|
|||
atomic.AddInt32(&ust.uss.executions[ust.i], 1)
|
||||
ust.igr.Add(1)
|
||||
ust.uss.clk.EventAfterDuration(ust.genCallK(k+1), ust.uc.execDuration+ust.uc.thinkDuration)
|
||||
ClockWait(ust.uss.clk, ust.uss.counter, ust.uc.execDuration)
|
||||
ust.uss.clk.Sleep(ust.uc.execDuration)
|
||||
ust.igr.Add(-1)
|
||||
})
|
||||
ust.uss.t.Logf("%s: %d, %d, %d got executed=%v, idle2=%v", ust.uss.clk.Now().Format(nsTimeFmt), ust.i, ust.j, k, executed, idle2)
|
||||
|
|
@ -358,16 +359,6 @@ func (uss *uniformScenarioState) finalReview() {
|
|||
}
|
||||
}
|
||||
|
||||
func ClockWait(clk *testclock.FakeEventClock, counter counter.GoRoutineCounter, duration time.Duration) {
|
||||
dunch := make(chan struct{})
|
||||
clk.EventAfterDuration(func(time.Time) {
|
||||
counter.Add(1)
|
||||
close(dunch)
|
||||
}, duration)
|
||||
counter.Add(-1)
|
||||
<-dunch
|
||||
}
|
||||
|
||||
func init() {
|
||||
klog.InitFlags(nil)
|
||||
}
|
||||
|
|
@ -376,7 +367,7 @@ func init() {
|
|||
func TestNoRestraint(t *testing.T) {
|
||||
metrics.Register()
|
||||
now := time.Now()
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
nrc, err := test.NewNoRestraintFactory().BeginConstruction(fq.QueuingConfig{}, newObserverPair(clk))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
|
|
@ -402,7 +393,7 @@ func TestUniformFlowsHandSize1(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestUniformFlowsHandSize1",
|
||||
|
|
@ -439,7 +430,7 @@ func TestUniformFlowsHandSize3(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestUniformFlowsHandSize3",
|
||||
|
|
@ -475,7 +466,7 @@ func TestDifferentFlowsExpectEqual(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "DiffFlowsExpectEqual",
|
||||
|
|
@ -512,7 +503,7 @@ func TestDifferentFlowsExpectUnequal(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "DiffFlowsExpectUnequal",
|
||||
|
|
@ -549,7 +540,7 @@ func TestWindup(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestWindup",
|
||||
|
|
@ -585,7 +576,7 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestDifferentFlowsWithoutQueuing",
|
||||
|
|
@ -618,7 +609,7 @@ func TestTimeout(t *testing.T) {
|
|||
metrics.Register()
|
||||
now := time.Now()
|
||||
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestTimeout",
|
||||
|
|
@ -654,7 +645,7 @@ func TestContextCancel(t *testing.T) {
|
|||
metrics.Register()
|
||||
metrics.Reset()
|
||||
now := time.Now()
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestContextCancel",
|
||||
|
|
@ -733,7 +724,7 @@ func TestTotalRequestsExecutingWithPanic(t *testing.T) {
|
|||
metrics.Register()
|
||||
metrics.Reset()
|
||||
now := time.Now()
|
||||
clk, counter := testclock.NewFakeEventClock(now, 0, nil)
|
||||
clk, counter := fqclocktest.NewFakeEventClock(now, 0, nil)
|
||||
qsf := NewQueueSetFactory(clk, counter)
|
||||
qCfg := fq.QueuingConfig{
|
||||
Name: "TestTotalRequestsExecutingWithPanic",
|
||||
|
|
|
|||
Loading…
Reference in New Issue