Refactored QueueSet configuration into two phases
So that errors can be detected before resolving concurrency shares into concurrency counts. Kubernetes-commit: 1e170637c3ce6c4ccd378275d9e52192f4be12b7
This commit is contained in:
		
							parent
							
								
									b17b8b7d8c
								
							
						
					
					
						commit
						4fead639b7
					
				|  | @ -21,9 +21,22 @@ import ( | ||||||
| 	"time" | 	"time" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // QueueSetFactory is used to create QueueSet objects.
 | // QueueSetFactory is used to create QueueSet objects.  Creation, like
 | ||||||
|  | // config update, is done in two phases: the first phase consumes the
 | ||||||
|  | // QueuingConfig and the second consumes the DispatchingConfig.  They
 | ||||||
|  | // are separated so that errors from the first phase can be found
 | ||||||
|  | // before committing to a concurrency allotment for the second.
 | ||||||
| type QueueSetFactory interface { | type QueueSetFactory interface { | ||||||
| 	NewQueueSet(config QueueSetConfig) (QueueSet, error) | 	// QualifyQueuingConfig does the first phase of creating a QueueSet
 | ||||||
|  | 	QualifyQueuingConfig(QueuingConfig) (QueueSetCompleter, error) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // QueueSetCompleter finishes the two-step process of creating or
 | ||||||
|  | // reconfiguring a QueueSet
 | ||||||
|  | type QueueSetCompleter interface { | ||||||
|  | 	// GetQueueSet returns a QueueSet configured by the given
 | ||||||
|  | 	// dispatching configuration.
 | ||||||
|  | 	GetQueueSet(DispatchingConfig) QueueSet | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // QueueSet is the abstraction for the queuing and dispatching
 | // QueueSet is the abstraction for the queuing and dispatching
 | ||||||
|  | @ -34,19 +47,27 @@ type QueueSetFactory interface { | ||||||
| // .  Some day we may have connections between priority levels, but
 | // .  Some day we may have connections between priority levels, but
 | ||||||
| // today is not that day.
 | // today is not that day.
 | ||||||
| type QueueSet interface { | type QueueSet interface { | ||||||
| 	// SetConfiguration updates the configuration
 | 	// QualifyQueuingConfig starts the two-step process of updating
 | ||||||
| 	SetConfiguration(QueueSetConfig) error | 	// the configuration.  No change is made until GetQueueSet is
 | ||||||
|  | 	// called.  If `C := X.QualifyQueuingConfig(q)` then
 | ||||||
|  | 	// `C.GetQueueSet(d)` returns the same value `X`.  If the
 | ||||||
|  | 	// QueuingConfig's DesiredNumQueues field is zero then the other
 | ||||||
|  | 	// queuing-specific config parameters are not changed, so that the
 | ||||||
|  | 	// queues continue draining as before.
 | ||||||
|  | 	QualifyQueuingConfig(QueuingConfig) (QueueSetCompleter, error) | ||||||
| 
 | 
 | ||||||
| 	// Quiesce controls whether the QueueSet is operating normally or is quiescing.
 | 	// Quiesce controls whether the QueueSet is operating normally or
 | ||||||
| 	// A quiescing QueueSet drains as normal but does not admit any
 | 	// is quiescing.  A quiescing QueueSet drains as normal but does
 | ||||||
| 	// new requests. Passing a non-nil handler means the system should
 | 	// not admit any new requests. Passing a non-nil handler means the
 | ||||||
| 	// be quiescing, a nil handler means the system should operate
 | 	// system should be quiescing, a nil handler means the system
 | ||||||
| 	// normally. A call to Wait while the system is quiescing
 | 	// should operate normally. A call to Wait while the system is
 | ||||||
| 	// will be rebuffed by returning tryAnother=true. If all the
 | 	// quiescing will be rebuffed by returning tryAnother=true. If all
 | ||||||
| 	// queues have no requests waiting nor executing while the system
 | 	// the queues have no requests waiting nor executing while the
 | ||||||
| 	// is quiescing then the handler will eventually be called with no
 | 	// system is quiescing then the handler will eventually be called
 | ||||||
| 	// locks held (even if the system becomes non-quiescing between the
 | 	// with no locks held (even if the system becomes non-quiescing
 | ||||||
| 	// triggering state and the required call).
 | 	// between the triggering state and the required call).  In Go
 | ||||||
|  | 	// Memory Model terms, the triggering state happens before the
 | ||||||
|  | 	// call to the EmptyHandler.
 | ||||||
| 	Quiesce(EmptyHandler) | 	Quiesce(EmptyHandler) | ||||||
| 
 | 
 | ||||||
| 	// Wait uses the given hashValue as the source of entropy as it
 | 	// Wait uses the given hashValue as the source of entropy as it
 | ||||||
|  | @ -56,34 +77,44 @@ type QueueSet interface { | ||||||
| 	// tryAnother==true at return then the QueueSet has become
 | 	// tryAnother==true at return then the QueueSet has become
 | ||||||
| 	// undesirable and the client should try to find a different
 | 	// undesirable and the client should try to find a different
 | ||||||
| 	// QueueSet to use; execute and afterExecution are irrelevant in
 | 	// QueueSet to use; execute and afterExecution are irrelevant in
 | ||||||
| 	// this case.  Otherwise, if execute then the client should start
 | 	// this case.  In the terms of the Go Memory Model, there was a
 | ||||||
| 	// executing the request and, once the request finishes execution
 | 	// call to Quiesce with a non-nil handler that happened before
 | ||||||
| 	// or is canceled, call afterExecution().  Otherwise the client
 | 	// this return from Wait.  Otherwise, if execute then the client
 | ||||||
| 	// should not execute the request and afterExecution is
 | 	// should start executing the request and, once the request
 | ||||||
| 	// irrelevant.
 | 	// finishes execution or is canceled, call afterExecution().
 | ||||||
|  | 	// Otherwise the client should not execute the request and
 | ||||||
|  | 	// afterExecution is irrelevant.
 | ||||||
| 	Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) | 	Wait(ctx context.Context, hashValue uint64, descr1, descr2 interface{}) (tryAnother, execute bool, afterExecution func()) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // QueueSetConfig defines the configuration of a QueueSet.
 | // QueuingConfig defines the configuration of the queuing aspect of a QueueSet.
 | ||||||
| type QueueSetConfig struct { | type QueuingConfig struct { | ||||||
| 	// Name is used to identify a queue set, allowing for descriptive information about its intended use
 | 	// Name is used to identify a queue set, allowing for descriptive information about its intended use
 | ||||||
| 	Name string | 	Name string | ||||||
| 	// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
 | 
 | ||||||
| 	ConcurrencyLimit int |  | ||||||
| 	// DesiredNumQueues is the number of queues that the API says
 | 	// DesiredNumQueues is the number of queues that the API says
 | ||||||
| 	// should exist now.  This may be zero, in which case
 | 	// should exist now.  This may be zero, in which case
 | ||||||
| 	// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
 | 	// QueueLengthLimit, HandSize, and RequestWaitLimit are ignored.
 | ||||||
| 	DesiredNumQueues int | 	DesiredNumQueues int | ||||||
|  | 
 | ||||||
| 	// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
 | 	// QueueLengthLimit is the maximum number of requests that may be waiting in a given queue at a time
 | ||||||
| 	QueueLengthLimit int | 	QueueLengthLimit int | ||||||
|  | 
 | ||||||
| 	// HandSize is a parameter of shuffle sharding.  Upon arrival of a request, a queue is chosen by randomly
 | 	// HandSize is a parameter of shuffle sharding.  Upon arrival of a request, a queue is chosen by randomly
 | ||||||
| 	// dealing a "hand" of this many queues and then picking one of minimum length.
 | 	// dealing a "hand" of this many queues and then picking one of minimum length.
 | ||||||
| 	HandSize int | 	HandSize int | ||||||
|  | 
 | ||||||
| 	// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
 | 	// RequestWaitLimit is the maximum amount of time that a request may wait in a queue.
 | ||||||
| 	// If, by the end of that time, the request has not been dispatched then it is rejected.
 | 	// If, by the end of that time, the request has not been dispatched then it is rejected.
 | ||||||
| 	RequestWaitLimit time.Duration | 	RequestWaitLimit time.Duration | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | // DispatchingConfig defines the configuration of the dispatching aspect of a QueueSet.
 | ||||||
|  | type DispatchingConfig struct { | ||||||
|  | 	// ConcurrencyLimit is the maximum number of requests of this QueueSet that may be executing at a time
 | ||||||
|  | 	ConcurrencyLimit int | ||||||
|  | } | ||||||
|  | 
 | ||||||
| // EmptyHandler is used to notify the callee when all the queues
 | // EmptyHandler is used to notify the callee when all the queues
 | ||||||
| // of a QueueSet have been drained.
 | // of a QueueSet have been drained.
 | ||||||
| type EmptyHandler interface { | type EmptyHandler interface { | ||||||
|  |  | ||||||
|  | @ -22,10 +22,10 @@ import ( | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
| 
 | 
 | ||||||
| 	"k8s.io/apimachinery/pkg/util/runtime" |  | ||||||
| 
 |  | ||||||
| 	"github.com/pkg/errors" | 	"github.com/pkg/errors" | ||||||
|  | 
 | ||||||
| 	"k8s.io/apimachinery/pkg/util/clock" | 	"k8s.io/apimachinery/pkg/util/clock" | ||||||
|  | 	"k8s.io/apimachinery/pkg/util/runtime" | ||||||
| 	"k8s.io/apiserver/pkg/util/flowcontrol/counter" | 	"k8s.io/apiserver/pkg/util/flowcontrol/counter" | ||||||
| 	fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" | 	fq "k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing" | ||||||
| 	"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" | 	"k8s.io/apiserver/pkg/util/flowcontrol/fairqueuing/promise/lockingpromise" | ||||||
|  | @ -43,12 +43,13 @@ type queueSetFactory struct { | ||||||
| 	clock   clock.PassiveClock | 	clock   clock.PassiveClock | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewQueueSetFactory creates a new QueueSetFactory object
 | // `*queueSetCompleter` implements QueueSetCompleter.  Exactly one of
 | ||||||
| func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { | // the fields `factory` and `theSet` is non-nil.
 | ||||||
| 	return &queueSetFactory{ | type queueSetCompleter struct { | ||||||
| 		counter: counter, | 	factory *queueSetFactory | ||||||
| 		clock:   c, | 	theSet  *queueSet | ||||||
| 	} | 	qCfg    fq.QueuingConfig | ||||||
|  | 	dealer  *shufflesharding.Dealer | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // queueSet implements the Fair Queuing for Server Requests technique
 | // queueSet implements the Fair Queuing for Server Requests technique
 | ||||||
|  | @ -65,12 +66,19 @@ type queueSet struct { | ||||||
| 
 | 
 | ||||||
| 	lock sync.Mutex | 	lock sync.Mutex | ||||||
| 
 | 
 | ||||||
| 	// config holds the current configuration.  Its DesiredNumQueues
 | 	// qCfg holds the current queuing configuration.  Its
 | ||||||
| 	// may be less than the current number of queues.  If its
 | 	// DesiredNumQueues may be less than the current number of queues.
 | ||||||
| 	// DesiredNumQueues is zero then its other queuing parameters
 | 	// If its DesiredNumQueues is zero then its other queuing
 | ||||||
| 	// retain the settings they had when DesiredNumQueues was last
 | 	// parameters retain the settings they had when DesiredNumQueues
 | ||||||
| 	// non-zero (if ever).
 | 	// was last non-zero (if ever).
 | ||||||
| 	config fq.QueueSetConfig | 	qCfg fq.QueuingConfig | ||||||
|  | 
 | ||||||
|  | 	// the current dispatching configuration.
 | ||||||
|  | 	dCfg fq.DispatchingConfig | ||||||
|  | 
 | ||||||
|  | 	// If `config.DesiredNumQueues` is non-zero then dealer is not nil
 | ||||||
|  | 	// and is good for `config`.
 | ||||||
|  | 	dealer *shufflesharding.Dealer | ||||||
| 
 | 
 | ||||||
| 	// queues may be longer than the desired number, while the excess
 | 	// queues may be longer than the desired number, while the excess
 | ||||||
| 	// queues are still draining.
 | 	// queues are still draining.
 | ||||||
|  | @ -96,24 +104,55 @@ type queueSet struct { | ||||||
| 	totRequestsExecuting int | 	totRequestsExecuting int | ||||||
| 
 | 
 | ||||||
| 	emptyHandler fq.EmptyHandler | 	emptyHandler fq.EmptyHandler | ||||||
| 	dealer       *shufflesharding.Dealer |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // NewQueueSet creates a new QueueSet object.
 | // NewQueueSetFactory creates a new QueueSetFactory object
 | ||||||
| // There is a new QueueSet created for each priority level.
 | func NewQueueSetFactory(c clock.PassiveClock, counter counter.GoRoutineCounter) fq.QueueSetFactory { | ||||||
| func (qsf queueSetFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { | 	return &queueSetFactory{ | ||||||
| 	fq := &queueSet{ | 		counter: counter, | ||||||
| 		clock:                qsf.clock, | 		clock:   c, | ||||||
| 		counter:              qsf.counter, |  | ||||||
| 		estimatedServiceTime: 60, |  | ||||||
| 		config:               config, |  | ||||||
| 		lastRealTime:         qsf.clock.Now(), |  | ||||||
| 	} | 	} | ||||||
| 	err := fq.SetConfiguration(config) | } | ||||||
|  | 
 | ||||||
|  | func (qsf *queueSetFactory) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { | ||||||
|  | 	dealer, err := checkConfig(qCfg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return nil, err | 		return nil, err | ||||||
| 	} | 	} | ||||||
| 	return fq, nil | 	return &queueSetCompleter{ | ||||||
|  | 		factory: qsf, | ||||||
|  | 		qCfg:    qCfg, | ||||||
|  | 		dealer:  dealer}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // checkConfig returns a non-nil Dealer if the config is valid and
 | ||||||
|  | // calls for one, and returns a non-nil error if the given config is
 | ||||||
|  | // invalid.
 | ||||||
|  | func checkConfig(qCfg fq.QueuingConfig) (*shufflesharding.Dealer, error) { | ||||||
|  | 	if qCfg.DesiredNumQueues == 0 { | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  | 	dealer, err := shufflesharding.NewDealer(qCfg.DesiredNumQueues, qCfg.HandSize) | ||||||
|  | 	if err != nil { | ||||||
|  | 		err = errors.Wrap(err, "the QueueSetConfig implies an invalid shuffle sharding config (DesiredNumQueues is deckSize)") | ||||||
|  | 	} | ||||||
|  | 	return dealer, err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (qsc *queueSetCompleter) GetQueueSet(dCfg fq.DispatchingConfig) fq.QueueSet { | ||||||
|  | 	qs := qsc.theSet | ||||||
|  | 	if qs == nil { | ||||||
|  | 		qs = &queueSet{ | ||||||
|  | 			clock:                qsc.factory.clock, | ||||||
|  | 			counter:              qsc.factory.counter, | ||||||
|  | 			estimatedServiceTime: 60, | ||||||
|  | 			qCfg:                 qsc.qCfg, | ||||||
|  | 			virtualTime:          0, | ||||||
|  | 			lastRealTime:         qsc.factory.clock.Now(), | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	qs.setConfiguration(qsc.qCfg, qsc.dealer, dCfg) | ||||||
|  | 	return qs | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // createQueues is a helper method for initializing an array of n queues
 | // createQueues is a helper method for initializing an array of n queues
 | ||||||
|  | @ -125,40 +164,45 @@ func createQueues(n, baseIndex int) []*queue { | ||||||
| 	return fqqueues | 	return fqqueues | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // SetConfiguration is used to set the configuration for a queueSet
 | func (qs *queueSet) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { | ||||||
| // update handling for when fields are updated is handled here as well -
 | 	dealer, err := checkConfig(qCfg) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, err | ||||||
|  | 	} | ||||||
|  | 	return &queueSetCompleter{ | ||||||
|  | 		theSet: qs, | ||||||
|  | 		qCfg:   qCfg, | ||||||
|  | 		dealer: dealer}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // SetConfiguration is used to set the configuration for a queueSet.
 | ||||||
|  | // Update handling for when fields are updated is handled here as well -
 | ||||||
| // eg: if DesiredNum is increased, SetConfiguration reconciles by
 | // eg: if DesiredNum is increased, SetConfiguration reconciles by
 | ||||||
| // adding more queues.
 | // adding more queues.
 | ||||||
| func (qs *queueSet) SetConfiguration(config fq.QueueSetConfig) error { | func (qs *queueSet) setConfiguration(qCfg fq.QueuingConfig, dealer *shufflesharding.Dealer, dCfg fq.DispatchingConfig) { | ||||||
| 	qs.lockAndSyncTime() | 	qs.lockAndSyncTime() | ||||||
| 	defer qs.lock.Unlock() | 	defer qs.lock.Unlock() | ||||||
| 	var dealer *shufflesharding.Dealer |  | ||||||
| 
 | 
 | ||||||
| 	if config.DesiredNumQueues > 0 { | 	if qCfg.DesiredNumQueues > 0 { | ||||||
| 		var err error |  | ||||||
| 		dealer, err = shufflesharding.NewDealer(config.DesiredNumQueues, config.HandSize) |  | ||||||
| 		if err != nil { |  | ||||||
| 			return errors.Wrap(err, "shuffle sharding dealer creation failed") |  | ||||||
| 		} |  | ||||||
| 		// Adding queues is the only thing that requires immediate action
 | 		// Adding queues is the only thing that requires immediate action
 | ||||||
| 		// Removing queues is handled by omitting indexes >DesiredNum from
 | 		// Removing queues is handled by omitting indexes >DesiredNum from
 | ||||||
| 		// chooseQueueIndexLocked
 | 		// chooseQueueIndexLocked
 | ||||||
| 		numQueues := len(qs.queues) | 		numQueues := len(qs.queues) | ||||||
| 		if config.DesiredNumQueues > numQueues { | 		if qCfg.DesiredNumQueues > numQueues { | ||||||
| 			qs.queues = append(qs.queues, | 			qs.queues = append(qs.queues, | ||||||
| 				createQueues(config.DesiredNumQueues-numQueues, len(qs.queues))...) | 				createQueues(qCfg.DesiredNumQueues-numQueues, len(qs.queues))...) | ||||||
| 		} | 		} | ||||||
| 	} else { | 	} else { | ||||||
| 		config.QueueLengthLimit = qs.config.QueueLengthLimit | 		qCfg.QueueLengthLimit = qs.qCfg.QueueLengthLimit | ||||||
| 		config.HandSize = qs.config.HandSize | 		qCfg.HandSize = qs.qCfg.HandSize | ||||||
| 		config.RequestWaitLimit = qs.config.RequestWaitLimit | 		qCfg.RequestWaitLimit = qs.qCfg.RequestWaitLimit | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	qs.config = config | 	qs.qCfg = qCfg | ||||||
|  | 	qs.dCfg = dCfg | ||||||
| 	qs.dealer = dealer | 	qs.dealer = dealer | ||||||
| 
 | 
 | ||||||
| 	qs.dispatchAsMuchAsPossibleLocked() | 	qs.dispatchAsMuchAsPossibleLocked() | ||||||
| 	return nil |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // Quiesce controls whether the QueueSet is operating normally or is quiescing.
 | // Quiesce controls whether the QueueSet is operating normally or is quiescing.
 | ||||||
|  | @ -216,16 +260,16 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i | ||||||
| 		// A call to Wait while the system is quiescing will be rebuffed by
 | 		// A call to Wait while the system is quiescing will be rebuffed by
 | ||||||
| 		// returning `tryAnother=true`.
 | 		// returning `tryAnother=true`.
 | ||||||
| 		if qs.emptyHandler != nil { | 		if qs.emptyHandler != nil { | ||||||
| 			klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.config.Name, descr1, descr2) | 			klog.V(5).Infof("QS(%s): rebuffing request %#+v %#+v with TryAnother", qs.qCfg.Name, descr1, descr2) | ||||||
| 			return decisionTryAnother | 			return decisionTryAnother | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
| 		// ========================================================================
 | 		// ========================================================================
 | ||||||
| 		// Step 0:
 | 		// Step 0:
 | ||||||
| 		// Apply only concurrency limit, if zero queues desired
 | 		// Apply only concurrency limit, if zero queues desired
 | ||||||
| 		if qs.config.DesiredNumQueues < 1 { | 		if qs.qCfg.DesiredNumQueues < 1 { | ||||||
| 			if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit { | 			if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit { | ||||||
| 				klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.config.Name, descr1, descr2, qs.totRequestsExecuting, qs.config.ConcurrencyLimit) | 				klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v because %d are executing and the limit is %d", qs.qCfg.Name, descr1, descr2, qs.totRequestsExecuting, qs.dCfg.ConcurrencyLimit) | ||||||
| 				return decisionReject | 				return decisionReject | ||||||
| 			} | 			} | ||||||
| 			req = qs.dispatchSansQueue(descr1, descr2) | 			req = qs.dispatchSansQueue(descr1, descr2) | ||||||
|  | @ -243,8 +287,8 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i | ||||||
| 		// req == nil means that the request was rejected - no remaining
 | 		// req == nil means that the request was rejected - no remaining
 | ||||||
| 		// concurrency shares and at max queue length already
 | 		// concurrency shares and at max queue length already
 | ||||||
| 		if req == nil { | 		if req == nil { | ||||||
| 			klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.config.Name, descr1, descr2) | 			klog.V(5).Infof("QS(%s): rejecting request %#+v %#+v due to queue full", qs.qCfg.Name, descr1, descr2) | ||||||
| 			metrics.AddReject(qs.config.Name, "queue-full") | 			metrics.AddReject(qs.qCfg.Name, "queue-full") | ||||||
| 			return decisionReject | 			return decisionReject | ||||||
| 		} | 		} | ||||||
| 
 | 
 | ||||||
|  | @ -274,7 +318,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i | ||||||
| 				qs.goroutineDoneOrBlocked() | 				qs.goroutineDoneOrBlocked() | ||||||
| 				select { | 				select { | ||||||
| 				case <-doneCh: | 				case <-doneCh: | ||||||
| 					klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.config.Name, descr1, descr2) | 					klog.V(6).Infof("QS(%s): Context of request %#+v %#+v is Done", qs.qCfg.Name, descr1, descr2) | ||||||
| 					req.decision.Set(decisionCancel) | 					req.decision.Set(decisionCancel) | ||||||
| 				} | 				} | ||||||
| 				qs.goroutineDoneOrBlocked() | 				qs.goroutineDoneOrBlocked() | ||||||
|  | @ -291,18 +335,18 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i | ||||||
| 		case requestDecision: | 		case requestDecision: | ||||||
| 			decision = dec | 			decision = dec | ||||||
| 		default: | 		default: | ||||||
| 			klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.config.Name, decisionAny, decisionAny, descr1, descr2) | 			klog.Errorf("QS(%s): Impossible decision %#+v (of type %T) for request %#+v %#+v", qs.qCfg.Name, decisionAny, decisionAny, descr1, descr2) | ||||||
| 			decision = decisionExecute | 			decision = decisionExecute | ||||||
| 		} | 		} | ||||||
| 		switch decision { | 		switch decision { | ||||||
| 		case decisionReject: | 		case decisionReject: | ||||||
| 			klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.config.Name, descr1, descr2) | 			klog.V(5).Infof("QS(%s): request %#+v %#+v timed out after being enqueued\n", qs.qCfg.Name, descr1, descr2) | ||||||
| 			metrics.AddReject(qs.config.Name, "time-out") | 			metrics.AddReject(qs.qCfg.Name, "time-out") | ||||||
| 		case decisionCancel: | 		case decisionCancel: | ||||||
| 			qs.syncTimeLocked() | 			qs.syncTimeLocked() | ||||||
| 			// TODO(aaron-prindle) add metrics to these two cases
 | 			// TODO(aaron-prindle) add metrics to these two cases
 | ||||||
| 			if req.isWaiting { | 			if req.isWaiting { | ||||||
| 				klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.config.Name, descr1, descr2) | 				klog.V(5).Infof("QS(%s): Ejecting request %#+v %#+v from its queue", qs.qCfg.Name, descr1, descr2) | ||||||
| 				// remove the request from the queue as it has timed out
 | 				// remove the request from the queue as it has timed out
 | ||||||
| 				for i := range req.queue.requests { | 				for i := range req.queue.requests { | ||||||
| 					if req == req.queue.requests[i] { | 					if req == req.queue.requests[i] { | ||||||
|  | @ -317,7 +361,7 @@ func (qs *queueSet) Wait(ctx context.Context, hashValue uint64, descr1, descr2 i | ||||||
| 				// then a call to the EmptyHandler should be forked.
 | 				// then a call to the EmptyHandler should be forked.
 | ||||||
| 				qs.maybeForkEmptyHandlerLocked() | 				qs.maybeForkEmptyHandlerLocked() | ||||||
| 			} else { | 			} else { | ||||||
| 				klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.config.Name, descr1, descr2) | 				klog.V(5).Infof("QS(%s): request %#+v %#+v canceled shortly after dispatch", qs.qCfg.Name, descr1, descr2) | ||||||
| 			} | 			} | ||||||
| 		} | 		} | ||||||
| 		return decision | 		return decision | ||||||
|  | @ -370,7 +414,7 @@ func (qs *queueSet) getVirtualTimeRatio() float64 { | ||||||
| 	if activeQueues == 0 { | 	if activeQueues == 0 { | ||||||
| 		return 0 | 		return 0 | ||||||
| 	} | 	} | ||||||
| 	return math.Min(float64(reqs), float64(qs.config.ConcurrencyLimit)) / float64(activeQueues) | 	return math.Min(float64(reqs), float64(qs.dCfg.ConcurrencyLimit)) / float64(activeQueues) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
 | // timeoutOldRequestsAndRejectOrEnqueueLocked encapsulates the logic required
 | ||||||
|  | @ -404,7 +448,7 @@ func (qs *queueSet) timeoutOldRequestsAndRejectOrEnqueueLocked(hashValue uint64, | ||||||
| 	if ok := qs.rejectOrEnqueueLocked(req); !ok { | 	if ok := qs.rejectOrEnqueueLocked(req); !ok { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	metrics.ObserveQueueLength(qs.config.Name, len(queue.requests)) | 	metrics.ObserveQueueLength(qs.qCfg.Name, len(queue.requests)) | ||||||
| 	return req | 	return req | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -415,13 +459,16 @@ func (qs *queueSet) chooseQueueIndexLocked(hashValue uint64, descr1, descr2 inte | ||||||
| 	bestQueueLen := int(math.MaxInt32) | 	bestQueueLen := int(math.MaxInt32) | ||||||
| 	// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
 | 	// the dealer uses the current desired number of queues, which is no larger than the number in `qs.queues`.
 | ||||||
| 	qs.dealer.Deal(hashValue, func(queueIdx int) { | 	qs.dealer.Deal(hashValue, func(queueIdx int) { | ||||||
|  | 		if queueIdx < 0 || queueIdx >= len(qs.queues) { | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
| 		thisLen := len(qs.queues[queueIdx].requests) | 		thisLen := len(qs.queues[queueIdx].requests) | ||||||
| 		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.config.Name, descr1, descr2, queueIdx, thisLen) | 		klog.V(7).Infof("QS(%s): For request %#+v %#+v considering queue %d of length %d", qs.qCfg.Name, descr1, descr2, queueIdx, thisLen) | ||||||
| 		if thisLen < bestQueueLen { | 		if thisLen < bestQueueLen { | ||||||
| 			bestQueueIdx, bestQueueLen = queueIdx, thisLen | 			bestQueueIdx, bestQueueLen = queueIdx, thisLen | ||||||
| 		} | 		} | ||||||
| 	}) | 	}) | ||||||
| 	klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.config.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) | 	klog.V(6).Infof("QS(%s): For request %#+v %#+v chose queue %d, had %d waiting & %d executing", qs.qCfg.Name, descr1, descr2, bestQueueIdx, bestQueueLen, qs.queues[bestQueueIdx].requestsExecuting) | ||||||
| 	return bestQueueIdx | 	return bestQueueIdx | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -436,7 +483,7 @@ func (qs *queueSet) removeTimedOutRequestsFromQueueLocked(queue *queue) { | ||||||
| 	// as newer requests also will not have timed out
 | 	// as newer requests also will not have timed out
 | ||||||
| 
 | 
 | ||||||
| 	// now - requestWaitLimit = waitLimit
 | 	// now - requestWaitLimit = waitLimit
 | ||||||
| 	waitLimit := now.Add(-qs.config.RequestWaitLimit) | 	waitLimit := now.Add(-qs.qCfg.RequestWaitLimit) | ||||||
| 	for i, req := range reqs { | 	for i, req := range reqs { | ||||||
| 		if waitLimit.After(req.arrivalTime) { | 		if waitLimit.After(req.arrivalTime) { | ||||||
| 			req.decision.SetLocked(decisionReject) | 			req.decision.SetLocked(decisionReject) | ||||||
|  | @ -463,8 +510,8 @@ func (qs *queueSet) rejectOrEnqueueLocked(request *request) bool { | ||||||
| 	queue := request.queue | 	queue := request.queue | ||||||
| 	curQueueLength := len(queue.requests) | 	curQueueLength := len(queue.requests) | ||||||
| 	// rejects the newly arrived request if resource criteria not met
 | 	// rejects the newly arrived request if resource criteria not met
 | ||||||
| 	if qs.totRequestsExecuting >= qs.config.ConcurrencyLimit && | 	if qs.totRequestsExecuting >= qs.dCfg.ConcurrencyLimit && | ||||||
| 		curQueueLength >= qs.config.QueueLengthLimit { | 		curQueueLength >= qs.qCfg.QueueLengthLimit { | ||||||
| 		return false | 		return false | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
|  | @ -479,12 +526,12 @@ func (qs *queueSet) enqueueLocked(request *request) { | ||||||
| 		// the queue’s virtual start time is set to the virtual time.
 | 		// the queue’s virtual start time is set to the virtual time.
 | ||||||
| 		queue.virtualStart = qs.virtualTime | 		queue.virtualStart = qs.virtualTime | ||||||
| 		if klog.V(6) { | 		if klog.V(6) { | ||||||
| 			klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) | 			klog.Infof("QS(%s) at r=%s v=%.9fs: initialized queue %d virtual start time due to request %#+v %#+v", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), queue.virtualStart, queue.index, request.descr1, request.descr2) | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| 	queue.Enqueue(request) | 	queue.Enqueue(request) | ||||||
| 	qs.totRequestsWaiting++ | 	qs.totRequestsWaiting++ | ||||||
| 	metrics.UpdateFlowControlRequestsInQueue(qs.config.Name, qs.totRequestsWaiting) | 	metrics.UpdateFlowControlRequestsInQueue(qs.qCfg.Name, qs.totRequestsWaiting) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // dispatchAsMuchAsPossibleLocked runs a loop, as long as there
 | // dispatchAsMuchAsPossibleLocked runs a loop, as long as there
 | ||||||
|  | @ -494,7 +541,7 @@ func (qs *queueSet) enqueueLocked(request *request) { | ||||||
| // queue, increment the count of the number executing, and send true
 | // queue, increment the count of the number executing, and send true
 | ||||||
| // to the request's channel.
 | // to the request's channel.
 | ||||||
| func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { | func (qs *queueSet) dispatchAsMuchAsPossibleLocked() { | ||||||
| 	for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.config.ConcurrencyLimit { | 	for qs.totRequestsWaiting != 0 && qs.totRequestsExecuting < qs.dCfg.ConcurrencyLimit { | ||||||
| 		ok := qs.dispatchLocked() | 		ok := qs.dispatchLocked() | ||||||
| 		if !ok { | 		if !ok { | ||||||
| 			break | 			break | ||||||
|  | @ -512,9 +559,9 @@ func (qs *queueSet) dispatchSansQueue(descr1, descr2 interface{}) *request { | ||||||
| 	} | 	} | ||||||
| 	qs.totRequestsExecuting++ | 	qs.totRequestsExecuting++ | ||||||
| 	if klog.V(5) { | 	if klog.V(5) { | ||||||
| 		klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.config.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) | 		klog.Infof("QS(%s) at r=%s v=%.9fs: immediate dispatch of request %#+v %#+v, qs will have %d executing", qs.qCfg.Name, now.Format(nsTimeFmt), qs.virtualTime, descr1, descr2, qs.totRequestsExecuting) | ||||||
| 	} | 	} | ||||||
| 	metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) | 	metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) | ||||||
| 	return req | 	return req | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | @ -537,11 +584,11 @@ func (qs *queueSet) dispatchLocked() bool { | ||||||
| 	qs.totRequestsExecuting++ | 	qs.totRequestsExecuting++ | ||||||
| 	queue.requestsExecuting++ | 	queue.requestsExecuting++ | ||||||
| 	if klog.V(6) { | 	if klog.V(6) { | ||||||
| 		klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) | 		klog.Infof("QS(%s) at r=%s v=%.9fs: dispatching request %#+v %#+v from queue %d with virtual start time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, request.startTime.Format(nsTimeFmt), qs.virtualTime, request.descr1, request.descr2, queue.index, queue.virtualStart, len(queue.requests), queue.requestsExecuting) | ||||||
| 	} | 	} | ||||||
| 	// When a request is dequeued for service -> qs.virtualStart += G
 | 	// When a request is dequeued for service -> qs.virtualStart += G
 | ||||||
| 	queue.virtualStart += qs.estimatedServiceTime | 	queue.virtualStart += qs.estimatedServiceTime | ||||||
| 	metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) | 	metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) | ||||||
| 	request.decision.SetLocked(decisionExecute) | 	request.decision.SetLocked(decisionExecute) | ||||||
| 	return ok | 	return ok | ||||||
| } | } | ||||||
|  | @ -590,11 +637,11 @@ func (qs *queueSet) finishRequestAndDispatchAsMuchAsPossible(req *request) { | ||||||
| // callback updates important state in the queueSet
 | // callback updates important state in the queueSet
 | ||||||
| func (qs *queueSet) finishRequestLocked(r *request) { | func (qs *queueSet) finishRequestLocked(r *request) { | ||||||
| 	qs.totRequestsExecuting-- | 	qs.totRequestsExecuting-- | ||||||
| 	metrics.UpdateFlowControlRequestsExecuting(qs.config.Name, qs.totRequestsExecuting) | 	metrics.UpdateFlowControlRequestsExecuting(qs.qCfg.Name, qs.totRequestsExecuting) | ||||||
| 
 | 
 | ||||||
| 	if r.queue == nil { | 	if r.queue == nil { | ||||||
| 		if klog.V(6) { | 		if klog.V(6) { | ||||||
| 			klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) | 			klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, qs will have %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, qs.totRequestsExecuting) | ||||||
| 		} | 		} | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  | @ -609,12 +656,12 @@ func (qs *queueSet) finishRequestLocked(r *request) { | ||||||
| 	r.queue.requestsExecuting-- | 	r.queue.requestsExecuting-- | ||||||
| 
 | 
 | ||||||
| 	if klog.V(6) { | 	if klog.V(6) { | ||||||
| 		klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.config.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) | 		klog.Infof("QS(%s) at r=%s v=%.9fs: request %#+v %#+v finished, adjusted queue %d virtual start time to %.9fs due to service time %.9fs, queue will have %d waiting & %d executing", qs.qCfg.Name, qs.clock.Now().Format(nsTimeFmt), qs.virtualTime, r.descr1, r.descr2, r.queue.index, r.queue.virtualStart, S, len(r.queue.requests), r.queue.requestsExecuting) | ||||||
| 	} | 	} | ||||||
| 
 | 
 | ||||||
| 	// If there are more queues than desired and this one has no
 | 	// If there are more queues than desired and this one has no
 | ||||||
| 	// requests then remove it
 | 	// requests then remove it
 | ||||||
| 	if len(qs.queues) > qs.config.DesiredNumQueues && | 	if len(qs.queues) > qs.qCfg.DesiredNumQueues && | ||||||
| 		len(r.queue.requests) == 0 && | 		len(r.queue.requests) == 0 && | ||||||
| 		r.queue.requestsExecuting == 0 { | 		r.queue.requestsExecuting == 0 { | ||||||
| 		qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) | 		qs.queues = removeQueueAndUpdateIndexes(qs.queues, r.queue.index) | ||||||
|  |  | ||||||
|  | @ -141,12 +141,11 @@ func init() { | ||||||
| func TestNoRestraint(t *testing.T) { | func TestNoRestraint(t *testing.T) { | ||||||
| 	now := time.Now() | 	now := time.Now() | ||||||
| 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | ||||||
| 	nrf := test.NewNoRestraintFactory() | 	nrc, err := test.NewNoRestraintFactory().QualifyQueuingConfig(fq.QueuingConfig{}) | ||||||
| 	config := fq.QueueSetConfig{} |  | ||||||
| 	nr, err := nrf.NewQueueSet(config) |  | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("QueueSet creation failed with %v", err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | 	nr := nrc.GetQueueSet(fq.DispatchingConfig{}) | ||||||
| 	exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ | 	exerciseQueueSetUniformScenario(t, "NoRestraint", nr, []uniformClient{ | ||||||
| 		{1001001001, 5, 10, time.Second, time.Second}, | 		{1001001001, 5, 10, time.Second, time.Second}, | ||||||
| 		{2002002002, 2, 10, time.Second, time.Second / 2}, | 		{2002002002, 2, 10, time.Second, time.Second / 2}, | ||||||
|  | @ -158,18 +157,18 @@ func TestUniformFlows(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | ||||||
| 	qsf := NewQueueSetFactory(clk, counter) | 	qsf := NewQueueSetFactory(clk, counter) | ||||||
| 	config := fq.QueueSetConfig{ | 	qCfg := fq.QueuingConfig{ | ||||||
| 		Name:             "TestUniformFlows", | 		Name:             "TestUniformFlows", | ||||||
| 		ConcurrencyLimit: 4, |  | ||||||
| 		DesiredNumQueues: 8, | 		DesiredNumQueues: 8, | ||||||
| 		QueueLengthLimit: 6, | 		QueueLengthLimit: 6, | ||||||
| 		HandSize:         3, | 		HandSize:         3, | ||||||
| 		RequestWaitLimit: 10 * time.Minute, | 		RequestWaitLimit: 10 * time.Minute, | ||||||
| 	} | 	} | ||||||
| 	qs, err := qsf.NewQueueSet(config) | 	qsc, err := qsf.QualifyQueuingConfig(qCfg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("QueueSet creation failed with %v", err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | 	qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4}) | ||||||
| 
 | 
 | ||||||
| 	exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ | 	exerciseQueueSetUniformScenario(t, "UniformFlows", qs, []uniformClient{ | ||||||
| 		{1001001001, 5, 10, time.Second, time.Second}, | 		{1001001001, 5, 10, time.Second, time.Second}, | ||||||
|  | @ -182,18 +181,18 @@ func TestDifferentFlows(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | ||||||
| 	qsf := NewQueueSetFactory(clk, counter) | 	qsf := NewQueueSetFactory(clk, counter) | ||||||
| 	config := fq.QueueSetConfig{ | 	qCfg := fq.QueuingConfig{ | ||||||
| 		Name:             "TestDifferentFlows", | 		Name:             "TestDifferentFlows", | ||||||
| 		ConcurrencyLimit: 4, |  | ||||||
| 		DesiredNumQueues: 8, | 		DesiredNumQueues: 8, | ||||||
| 		QueueLengthLimit: 6, | 		QueueLengthLimit: 6, | ||||||
| 		HandSize:         3, | 		HandSize:         3, | ||||||
| 		RequestWaitLimit: 10 * time.Minute, | 		RequestWaitLimit: 10 * time.Minute, | ||||||
| 	} | 	} | ||||||
| 	qs, err := qsf.NewQueueSet(config) | 	qsc, err := qsf.QualifyQueuingConfig(qCfg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("QueueSet creation failed with %v", err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | 	qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4}) | ||||||
| 
 | 
 | ||||||
| 	exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ | 	exerciseQueueSetUniformScenario(t, "DifferentFlows", qs, []uniformClient{ | ||||||
| 		{1001001001, 6, 10, time.Second, time.Second}, | 		{1001001001, 6, 10, time.Second, time.Second}, | ||||||
|  | @ -206,18 +205,15 @@ func TestDifferentFlowsWithoutQueuing(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | ||||||
| 	qsf := NewQueueSetFactory(clk, counter) | 	qsf := NewQueueSetFactory(clk, counter) | ||||||
| 	config := fq.QueueSetConfig{ | 	qCfg := fq.QueuingConfig{ | ||||||
| 		Name:             "TestDifferentFlowsWithoutQueuing", | 		Name:             "TestDifferentFlowsWithoutQueuing", | ||||||
| 		ConcurrencyLimit: 4, |  | ||||||
| 		DesiredNumQueues: 0, | 		DesiredNumQueues: 0, | ||||||
| 		QueueLengthLimit: 6, |  | ||||||
| 		HandSize:         3, |  | ||||||
| 		RequestWaitLimit: 10 * time.Minute, |  | ||||||
| 	} | 	} | ||||||
| 	qs, err := qsf.NewQueueSet(config) | 	qsc, err := qsf.QualifyQueuingConfig(qCfg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("QueueSet creation failed with %v", err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | 	qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 4}) | ||||||
| 
 | 
 | ||||||
| 	exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ | 	exerciseQueueSetUniformScenario(t, "DifferentFlowsWithoutQueuing", qs, []uniformClient{ | ||||||
| 		{1001001001, 6, 10, time.Second, 57 * time.Millisecond}, | 		{1001001001, 6, 10, time.Second, 57 * time.Millisecond}, | ||||||
|  | @ -230,18 +226,18 @@ func TestTimeout(t *testing.T) { | ||||||
| 
 | 
 | ||||||
| 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | 	clk, counter := clock.NewFakeEventClock(now, 0, nil) | ||||||
| 	qsf := NewQueueSetFactory(clk, counter) | 	qsf := NewQueueSetFactory(clk, counter) | ||||||
| 	config := fq.QueueSetConfig{ | 	qCfg := fq.QueuingConfig{ | ||||||
| 		Name:             "TestTimeout", | 		Name:             "TestTimeout", | ||||||
| 		ConcurrencyLimit: 1, |  | ||||||
| 		DesiredNumQueues: 128, | 		DesiredNumQueues: 128, | ||||||
| 		QueueLengthLimit: 128, | 		QueueLengthLimit: 128, | ||||||
| 		HandSize:         1, | 		HandSize:         1, | ||||||
| 		RequestWaitLimit: 0, | 		RequestWaitLimit: 0, | ||||||
| 	} | 	} | ||||||
| 	qs, err := qsf.NewQueueSet(config) | 	qsc, err := qsf.QualifyQueuingConfig(qCfg) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		t.Fatalf("QueueSet creation failed with %v", err) | 		t.Fatal(err) | ||||||
| 	} | 	} | ||||||
|  | 	qs := qsc.GetQueueSet(fq.DispatchingConfig{ConcurrencyLimit: 1}) | ||||||
| 
 | 
 | ||||||
| 	exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ | 	exerciseQueueSetUniformScenario(t, "Timeout", qs, []uniformClient{ | ||||||
| 		{1001001001, 5, 100, time.Second, time.Second}, | 		{1001001001, 5, 100, time.Second, time.Second}, | ||||||
|  |  | ||||||
|  | @ -31,14 +31,20 @@ func NewNoRestraintFactory() fq.QueueSetFactory { | ||||||
| 
 | 
 | ||||||
| type noRestraintFactory struct{} | type noRestraintFactory struct{} | ||||||
| 
 | 
 | ||||||
| func (noRestraintFactory) NewQueueSet(config fq.QueueSetConfig) (fq.QueueSet, error) { | type noRestraintCompeter struct{} | ||||||
| 	return noRestraint{}, nil |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| type noRestraint struct{} | type noRestraint struct{} | ||||||
| 
 | 
 | ||||||
| func (noRestraint) SetConfiguration(config fq.QueueSetConfig) error { | func (noRestraintFactory) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { | ||||||
| 	return nil | 	return noRestraintCompeter{}, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (noRestraintCompeter) GetQueueSet(dCfg fq.DispatchingConfig) fq.QueueSet { | ||||||
|  | 	return noRestraint{} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (noRestraint) QualifyQueuingConfig(qCfg fq.QueuingConfig) (fq.QueueSetCompleter, error) { | ||||||
|  | 	return noRestraintCompeter{}, nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (noRestraint) Quiesce(fq.EmptyHandler) { | func (noRestraint) Quiesce(fq.EmptyHandler) { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue