mirror of https://github.com/knative/pkg.git
Implement Slowlane/FastLane queues (#1512)
* Fast/slow queue implementation/ * more3
This commit is contained in:
parent
0f78f8a8cc
commit
97e2175a17
|
@ -0,0 +1,159 @@
|
|||
/*
|
||||
Copyright 2020 The Knative Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
// twoLaneQueue is a rate limited queue that wraps around two queues
|
||||
// -- fast queue (anonymously aliased), whose contents are processed with priority.
|
||||
// -- slow queue (slowLane queue), whose contents are processed if fast queue has no items.
|
||||
// All the default methods operate on the fast queue, unless noted otherwise.
|
||||
type twoLaneQueue struct {
|
||||
workqueue.RateLimitingInterface
|
||||
slowLane workqueue.RateLimitingInterface
|
||||
// consumerQueue is necessary to ensure that we're not reconciling
|
||||
// the same object at the exact same time (e.g. if it had been enqueued
|
||||
// in both fast and slow and is the only object there).
|
||||
consumerQueue workqueue.Interface
|
||||
|
||||
name string
|
||||
|
||||
fastChan chan interface{}
|
||||
slowChan chan interface{}
|
||||
}
|
||||
|
||||
// Creates a new twoLaneQueue.
|
||||
func newTwoLaneWorkQueue(name string) *twoLaneQueue {
|
||||
rl := workqueue.DefaultControllerRateLimiter()
|
||||
tlq := &twoLaneQueue{
|
||||
RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(
|
||||
rl,
|
||||
name+"-fast",
|
||||
),
|
||||
slowLane: workqueue.NewNamedRateLimitingQueue(
|
||||
rl,
|
||||
name+"-slow",
|
||||
),
|
||||
consumerQueue: workqueue.NewNamed(name + "-consumer"),
|
||||
name: name,
|
||||
fastChan: make(chan interface{}),
|
||||
slowChan: make(chan interface{}),
|
||||
}
|
||||
// Run consumer thread.
|
||||
go tlq.runConsumer()
|
||||
// Run producer threads.
|
||||
go process(tlq.RateLimitingInterface, tlq.fastChan)
|
||||
go process(tlq.slowLane, tlq.slowChan)
|
||||
return tlq
|
||||
}
|
||||
|
||||
func process(q workqueue.Interface, ch chan interface{}) {
|
||||
// Sender closes the channel
|
||||
defer close(ch)
|
||||
for {
|
||||
i, d := q.Get()
|
||||
// If the queue is empty and we're shutting down — stop the loop.
|
||||
if d {
|
||||
break
|
||||
}
|
||||
ch <- i
|
||||
q.Done(i)
|
||||
}
|
||||
}
|
||||
|
||||
func (tlq *twoLaneQueue) runConsumer() {
|
||||
// Shutdown flags.
|
||||
fast, slow := true, true
|
||||
// When both producer queues are shutdown stop the consumerQueue.
|
||||
defer tlq.consumerQueue.ShutDown()
|
||||
// While any of the queues is still running, try to read off of them.
|
||||
for fast || slow {
|
||||
// By default drain the fast lane.
|
||||
// Channels in select are picked random, so first
|
||||
// we have a select that only looks at the fast lane queue.
|
||||
if fast {
|
||||
select {
|
||||
case item, ok := <-tlq.fastChan:
|
||||
if !ok {
|
||||
// This queue is shutdown and drained. Stop looking at it.
|
||||
fast = false
|
||||
continue
|
||||
}
|
||||
tlq.consumerQueue.Add(item)
|
||||
continue
|
||||
default:
|
||||
// This immediately exits the wait if the fast chan is empty.
|
||||
}
|
||||
}
|
||||
|
||||
// If the fast lane queue had no items, we can select from both.
|
||||
// Obviously if suddenly both are populated at the same time there's a
|
||||
// 50% chance that the slow would be picked first, but this should be
|
||||
// a rare occasion not to really worry about it.
|
||||
select {
|
||||
case item, ok := <-tlq.fastChan:
|
||||
if !ok {
|
||||
// This queue is shutdown and drained. Stop looking at it.
|
||||
fast = false
|
||||
continue
|
||||
}
|
||||
tlq.consumerQueue.Add(item)
|
||||
case item, ok := <-tlq.slowChan:
|
||||
if !ok {
|
||||
// This queue is shutdown and drained. Stop looking at it.
|
||||
slow = false
|
||||
continue
|
||||
}
|
||||
tlq.consumerQueue.Add(item)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown implements workqueue.Interace.
|
||||
// Shutdown shuts down both queues.
|
||||
func (tlq *twoLaneQueue) ShutDown() {
|
||||
tlq.RateLimitingInterface.ShutDown()
|
||||
tlq.slowLane.ShutDown()
|
||||
}
|
||||
|
||||
// Done implements workqueue.Interface.
|
||||
// Done marks the item as completed in all the queues.
|
||||
// NB: this will just re-enqueue the object on the queue that
|
||||
// didn't originate the object.
|
||||
func (tlq *twoLaneQueue) Done(i interface{}) {
|
||||
tlq.consumerQueue.Done(i)
|
||||
}
|
||||
|
||||
// Get implements workqueue.Interface.
|
||||
// It gets the item from fast lane if it has anything, alternatively
|
||||
// the slow lane.
|
||||
func (tlq *twoLaneQueue) Get() (interface{}, bool) {
|
||||
return tlq.consumerQueue.Get()
|
||||
}
|
||||
|
||||
// Len returns the sum of lengths.
|
||||
// NB: actual _number_ of unique object might be less than this sum.
|
||||
func (tlq *twoLaneQueue) Len() int {
|
||||
return tlq.RateLimitingInterface.Len() + tlq.slowLane.Len() + tlq.consumerQueue.Len()
|
||||
}
|
||||
|
||||
// SlowLane gives direct access to the slow queue.
|
||||
func (tlq *twoLaneQueue) SlowLane() workqueue.RateLimitingInterface {
|
||||
return tlq.slowLane
|
||||
}
|
|
@ -0,0 +1,133 @@
|
|||
/*
|
||||
Copyright 2020 The Knative Authors
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package controller
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func TestSlowQueue(t *testing.T) {
|
||||
q := newTwoLaneWorkQueue("live-in-the-fast-lane")
|
||||
q.SlowLane().Add("1")
|
||||
if got, want := q.Len(), 1; got != want {
|
||||
t.Errorf("Len = %d, want: 1", got)
|
||||
}
|
||||
k, done := q.Get()
|
||||
if got, want := k.(string), "1"; got != want {
|
||||
t.Errorf(`Got = %q, want: "1"`, got)
|
||||
}
|
||||
if done {
|
||||
t.Error("The queue is unexpectedly shutdown")
|
||||
}
|
||||
q.Done(k)
|
||||
q.ShutDown()
|
||||
if !q.SlowLane().ShuttingDown() {
|
||||
t.Error("ShutDown did not propagate to the slow queue")
|
||||
}
|
||||
if _, done := q.Get(); !done {
|
||||
t.Error("Get did not return positive shutdown signal")
|
||||
}
|
||||
}
|
||||
|
||||
func TestDoubleKey(t *testing.T) {
|
||||
// Verifies that we don't get double concurrent processing of the same key.
|
||||
q := newTwoLaneWorkQueue("live-in-the-fast-lane")
|
||||
q.Add("1")
|
||||
q.SlowLane().Add("1")
|
||||
t.Cleanup(q.ShutDown)
|
||||
|
||||
k, done := q.Get()
|
||||
if got, want := k.(string), "1"; got != want {
|
||||
t.Errorf(`Got = %q, want: "1"`, got)
|
||||
}
|
||||
if done {
|
||||
t.Error("The queue is unexpectedly shutdown")
|
||||
}
|
||||
|
||||
sentinel := make(chan struct{})
|
||||
go func() {
|
||||
defer close(sentinel)
|
||||
k, done := q.Get()
|
||||
if got, want := k.(string), "1"; got != want {
|
||||
t.Errorf(`2nd time got = %q, want: "1"`, got)
|
||||
}
|
||||
if done {
|
||||
t.Error("The queue is unexpectedly shutdown")
|
||||
}
|
||||
q.Done(k)
|
||||
}()
|
||||
select {
|
||||
case <-sentinel:
|
||||
t.Error("The sentinel should not have fired")
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
// Expected.
|
||||
}
|
||||
// This should permit the re-reading of the same key.
|
||||
q.Done(k)
|
||||
select {
|
||||
case <-sentinel:
|
||||
// Expected.
|
||||
case <-time.After(200 * time.Millisecond):
|
||||
t.Error("The item was not processed as expected")
|
||||
}
|
||||
}
|
||||
|
||||
func TestOrder(t *testing.T) {
|
||||
// Verifies that we read from the fast queue first.
|
||||
q := newTwoLaneWorkQueue("live-in-the-fast-lane")
|
||||
stop := make(chan struct{})
|
||||
t.Cleanup(func() {
|
||||
close(stop)
|
||||
q.ShutDown()
|
||||
// Drain the rest.
|
||||
for q.Len() > 0 {
|
||||
q.Get()
|
||||
}
|
||||
})
|
||||
|
||||
go func() {
|
||||
for i := 1; ; i++ {
|
||||
q.Add(strconv.Itoa(i))
|
||||
// Get fewer of those, to ensure the first priority select wins.
|
||||
if i%2 == 0 {
|
||||
q.SlowLane().Add("slow" + strconv.Itoa(i))
|
||||
}
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
default:
|
||||
}
|
||||
}
|
||||
}()
|
||||
done := time.After(300 * time.Millisecond)
|
||||
for {
|
||||
select {
|
||||
case <-done:
|
||||
return
|
||||
default:
|
||||
}
|
||||
v, sd := q.Get()
|
||||
if sd {
|
||||
t.Error("Got shutdown signal")
|
||||
} else if v.(string) == "slow" {
|
||||
t.Error("Got item from the slow queue")
|
||||
}
|
||||
q.Done(v)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue