apf: refactor promise to use a context
Kubernetes-commit: 0039f24d74d0f57c8ba868ae361821d37fd908d6
This commit is contained in:
parent
89074f262b
commit
27772523df
|
@ -17,12 +17,13 @@ limitations under the License.
|
|||
package promise
|
||||
|
||||
import (
|
||||
"context"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// promise implements the WriteOnce interface.
|
||||
type promise struct {
|
||||
doneCh <-chan struct{}
|
||||
doneCtx context.Context
|
||||
doneVal interface{}
|
||||
setCh chan struct{}
|
||||
onceler sync.Once
|
||||
|
@ -35,12 +36,12 @@ var _ WriteOnce = &promise{}
|
|||
//
|
||||
// If `initial` is non-nil then that value is Set at creation time.
|
||||
//
|
||||
// If a `Get` is waiting soon after `doneCh` becomes selectable (which
|
||||
// never happens for the nil channel) then `Set(doneVal)` effectively
|
||||
// happens at that time.
|
||||
func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) WriteOnce {
|
||||
// If a `Get` is waiting soon after the channel associated with the
|
||||
// `doneCtx` becomes selectable (which never happens for the nil
|
||||
// channel) then `Set(doneVal)` effectively happens at that time.
|
||||
func NewWriteOnce(initial interface{}, doneCtx context.Context, doneVal interface{}) WriteOnce {
|
||||
p := &promise{
|
||||
doneCh: doneCh,
|
||||
doneCtx: doneCtx,
|
||||
doneVal: doneVal,
|
||||
setCh: make(chan struct{}),
|
||||
}
|
||||
|
@ -53,7 +54,7 @@ func NewWriteOnce(initial interface{}, doneCh <-chan struct{}, doneVal interface
|
|||
func (p *promise) Get() interface{} {
|
||||
select {
|
||||
case <-p.setCh:
|
||||
case <-p.doneCh:
|
||||
case <-p.doneCtx.Done():
|
||||
p.Set(p.doneVal)
|
||||
}
|
||||
return p.value
|
||||
|
|
|
@ -28,7 +28,7 @@ func TestWriteOnceSet(t *testing.T) {
|
|||
oldTime := time.Now()
|
||||
cval := &oldTime
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wr := NewWriteOnce(nil, ctx.Done(), cval)
|
||||
wr := NewWriteOnce(nil, ctx, cval)
|
||||
gots := make(chan interface{})
|
||||
goGetExpectNotYet(t, wr, gots, "Set")
|
||||
now := time.Now()
|
||||
|
@ -53,7 +53,7 @@ func TestWriteOnceCancel(t *testing.T) {
|
|||
oldTime := time.Now()
|
||||
cval := &oldTime
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
wr := NewWriteOnce(nil, ctx.Done(), cval)
|
||||
wr := NewWriteOnce(nil, ctx, cval)
|
||||
gots := make(chan interface{})
|
||||
goGetExpectNotYet(t, wr, gots, "cancel")
|
||||
cancel()
|
||||
|
@ -73,7 +73,7 @@ func TestWriteOnceInitial(t *testing.T) {
|
|||
ctx, cancel := context.WithCancel(context.Background())
|
||||
now := time.Now()
|
||||
aval := &now
|
||||
wr := NewWriteOnce(aval, ctx.Done(), cval)
|
||||
wr := NewWriteOnce(aval, ctx, cval)
|
||||
gots := make(chan interface{})
|
||||
goGetAndExpect(t, wr, gots, aval)
|
||||
later := time.Now()
|
||||
|
|
|
@ -53,7 +53,7 @@ type queueSetFactory struct {
|
|||
// - whose Set method is invoked with the queueSet locked, and
|
||||
// - whose Get method is invoked with the queueSet not locked.
|
||||
// The parameters are the same as for `promise.NewWriteOnce`.
|
||||
type promiseFactory func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce
|
||||
type promiseFactory func(initial interface{}, doneCtx context.Context, doneVal interface{}) promise.WriteOnce
|
||||
|
||||
// promiseFactoryFactory returns the promiseFactory to use for the given queueSet
|
||||
type promiseFactoryFactory func(*queueSet) promiseFactory
|
||||
|
@ -584,7 +584,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(ctx context.Conte
|
|||
fsName: fsName,
|
||||
flowDistinguisher: flowDistinguisher,
|
||||
ctx: ctx,
|
||||
decision: qs.promiseFactory(nil, ctx.Done(), decisionCancel),
|
||||
decision: qs.promiseFactory(nil, ctx, decisionCancel),
|
||||
arrivalTime: qs.clock.Now(),
|
||||
arrivalR: qs.currentR,
|
||||
queue: queue,
|
||||
|
@ -725,7 +725,7 @@ func (qs *queueSet) dispatchSansQueueLocked(ctx context.Context, workEstimate *f
|
|||
flowDistinguisher: flowDistinguisher,
|
||||
ctx: ctx,
|
||||
startTime: now,
|
||||
decision: qs.promiseFactory(decisionExecute, ctx.Done(), decisionCancel),
|
||||
decision: qs.promiseFactory(decisionExecute, ctx, decisionCancel),
|
||||
arrivalTime: now,
|
||||
arrivalR: qs.currentR,
|
||||
descr1: descr1,
|
||||
|
|
|
@ -1223,8 +1223,8 @@ func TestContextCancel(t *testing.T) {
|
|||
|
||||
func countingPromiseFactoryFactory(activeCounter counter.GoRoutineCounter) promiseFactoryFactory {
|
||||
return func(qs *queueSet) promiseFactory {
|
||||
return func(initial interface{}, doneCh <-chan struct{}, doneVal interface{}) promise.WriteOnce {
|
||||
return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCh, doneVal)
|
||||
return func(initial interface{}, doneCtx context.Context, doneVal interface{}) promise.WriteOnce {
|
||||
return testpromise.NewCountingWriteOnce(activeCounter, &qs.lock, initial, doneCtx.Done(), doneVal)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue