From 27772523dfdaed81a5c5cf34bd1cb7af0b693a8e Mon Sep 17 00:00:00 2001 From: Abu Kashem Date: Mon, 21 Aug 2023 15:19:31 -0400 Subject: [PATCH] apf: refactor promise to use a context Kubernetes-commit: 0039f24d74d0f57c8ba868ae361821d37fd908d6 --- .../flowcontrol/fairqueuing/promise/promise.go | 15 ++++++++------- .../fairqueuing/promise/promise_test.go | 6 +++--- .../flowcontrol/fairqueuing/queueset/queueset.go | 6 +++--- .../fairqueuing/queueset/queueset_test.go | 4 ++-- 4 files changed, 16 insertions(+), 15 deletions(-) diff --git a/pkg/util/flowcontrol/fairqueuing/promise/promise.go b/pkg/util/flowcontrol/fairqueuing/promise/promise.go index d3bda40aa..79d19d136 100644 --- a/pkg/util/flowcontrol/fairqueuing/promise/promise.go +++ b/pkg/util/flowcontrol/fairqueuing/promise/promise.go @@ -17,12 +17,13 @@ limitations under the License. package promise import ( + "context" "sync" ) // promise implements the WriteOnce interface. type promise struct { - doneCh <-chan struct{} + doneCtx context.Context doneVal interface{} setCh chan struct{} onceler sync.Once @@ -35,12 +36,12 @@ var _ WriteOnce = &promise{} // // If `initial` is non-nil then that value is Set at creation time. // -// If a `Get` is waiting soon after `doneCh` becomes selectable (which -// never happens for the nil channel) then `Set(doneVal)` effectively -// happens at that time. -func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) WriteOnce { +// If a `Get` is waiting soon after the channel associated with the +// `doneCtx` becomes selectable (which never happens for the nil +// channel) then `Set(doneVal)` effectively happens at that time. +func NewWriteOnce(initial interface{}, doneCtx context.Context, doneVal interface{}) WriteOnce { p := &promise{ - doneCh: doneCh, + doneCtx: doneCtx, doneVal: doneVal, setCh: make(chan struct{}), } @@ -53,7 +54,7 @@ func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface func (p *promise) Get() interface{} { select { case <-p.setCh: - case <-p.doneCh: + case <-p.doneCtx.Done(): p.Set(p.doneVal) } return p.value diff --git a/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go b/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go index e3864c2df..0e8ac347d 100644 --- a/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go +++ b/pkg/util/flowcontrol/fairqueuing/promise/promise_test.go @@ -28,7 +28,7 @@ func TestWriteOnceSet(t *testing.T) { oldTime := time.Now() cval := &oldTime ctx, cancel := context.WithCancel(context.Background()) - wr := NewWriteOnce(nil, ctx.Done(), cval) + wr := NewWriteOnce(nil, ctx, cval) gots := make(chan interface{}) goGetExpectNotYet(t, wr, gots, "Set") now := time.Now() @@ -53,7 +53,7 @@ func TestWriteOnceCancel(t *testing.T) { oldTime := time.Now() cval := &oldTime ctx, cancel := context.WithCancel(context.Background()) - wr := NewWriteOnce(nil, ctx.Done(), cval) + wr := NewWriteOnce(nil, ctx, cval) gots := make(chan interface{}) goGetExpectNotYet(t, wr, gots, "cancel") cancel() @@ -73,7 +73,7 @@ func TestWriteOnceInitial(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) now := time.Now() aval := &now - wr := NewWriteOnce(aval, ctx.Done(), cval) + wr := NewWriteOnce(aval, ctx, cval) gots := make(chan interface{}) goGetAndExpect(t, wr, gots, aval) later := time.Now() diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go index d955be765..99397ecba 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset.go @@ -53,7 +53,7 @@ type queueSetFactory struct { // - whose Set method is invoked with the queueSet locked, and // - whose Get method is invoked with the queueSet not locked. // The parameters are the same as for `promise.NewWriteOnce`. -type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce +type promiseFactory func(initial interface{}, doneCtx context.Context, doneVal interface{}) promise.WriteOnce // promiseFactoryFactory returns the promiseFactory to use for the given queueSet type promiseFactoryFactory func(*queueSet) promiseFactory @@ -584,7 +584,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte fsName: fsName, flowDistinguisher: flowDistinguisher, ctx: ctx, - decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel), + decision: qs.promiseFactory(nil, ctx, decisionCancel), arrivalTime: qs.clock.Now(), arrivalR: qs.currentR, queue: queue, @@ -725,7 +725,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f flowDistinguisher: flowDistinguisher, ctx: ctx, startTime: now, - decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel), + decision: qs.promiseFactory(decisionExecute, ctx, decisionCancel), arrivalTime: now, arrivalR: qs.currentR, descr1: descr1, diff --git a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go index 1d3d5e5b2..ab43b54c4 100644 --- a/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go +++ b/pkg/util/flowcontrol/fairqueuing/queueset/queueset_test.go @@ -1223,8 +1223,8 @@ func TestContextCancel(t *testing.T) { func countingPromiseFactoryFactory(activeCounter counter.GoRoutineCounter) promiseFactoryFactory { return func(qs *queueSet) promiseFactory { - return func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce { - return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCh, doneVal) + return func(initial interface{}, doneCtx context.Context, doneVal interface{}) promise.WriteOnce { + return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCtx.Done(), doneVal) } } }