234 lines
6.0 KiB
Go
234 lines
6.0 KiB
Go
package routing
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
httpv1alpha1 "github.com/kedacore/http-add-on/operator/apis/http/v1alpha1"
|
|
"github.com/kedacore/http-add-on/operator/generated/informers/externalversions"
|
|
informershttpv1alpha1 "github.com/kedacore/http-add-on/operator/generated/informers/externalversions/http/v1alpha1"
|
|
"github.com/kedacore/http-add-on/pkg/k8s"
|
|
"github.com/kedacore/http-add-on/pkg/queue"
|
|
"github.com/kedacore/http-add-on/pkg/util"
|
|
)
|
|
|
|
var (
|
|
errUnknownSharedIndexInformer = errors.New("informer is not cache.sharedIndexInformer")
|
|
errStartedSharedIndexInformer = errors.New("sharedIndexInformer has started, run more than once is not allowed")
|
|
errStoppedSharedIndexInformer = errors.New("sharedIndexInformer has stopped")
|
|
errNotSyncedTable = errors.New("table has not synced")
|
|
)
|
|
|
|
type Table interface {
|
|
util.HealthChecker
|
|
|
|
Start(ctx context.Context) error
|
|
Route(req *http.Request) *httpv1alpha1.HTTPScaledObject
|
|
HasSynced() bool
|
|
}
|
|
|
|
type table struct {
|
|
httpScaledObjectInformer sharedIndexInformer
|
|
httpScaledObjectEventHandlerRegistration cache.ResourceEventHandlerRegistration
|
|
httpScaledObjects map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject
|
|
httpScaledObjectsMutex sync.RWMutex
|
|
memoryHolder util.AtomicValue[TableMemory]
|
|
memorySignaler util.Signaler
|
|
queueCounter queue.Counter
|
|
}
|
|
|
|
func NewTable(sharedInformerFactory externalversions.SharedInformerFactory, namespace string, counter queue.Counter) (Table, error) {
|
|
httpScaledObjects := informershttpv1alpha1.New(sharedInformerFactory, namespace, nil).HTTPScaledObjects()
|
|
|
|
t := table{
|
|
httpScaledObjects: make(map[types.NamespacedName]*httpv1alpha1.HTTPScaledObject),
|
|
memorySignaler: util.NewSignaler(),
|
|
}
|
|
|
|
informer, ok := httpScaledObjects.Informer().(sharedIndexInformer)
|
|
if !ok {
|
|
return nil, errUnknownSharedIndexInformer
|
|
}
|
|
t.httpScaledObjectInformer = informer
|
|
|
|
registration, err := informer.AddEventHandler(&t)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
t.httpScaledObjectEventHandlerRegistration = registration
|
|
t.queueCounter = counter
|
|
return &t, nil
|
|
}
|
|
|
|
func (t *table) runInformer(ctx context.Context) error {
|
|
if t.httpScaledObjectInformer.HasStarted() {
|
|
return errStartedSharedIndexInformer
|
|
}
|
|
|
|
t.httpScaledObjectInformer.Run(ctx.Done())
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
default:
|
|
return errStoppedSharedIndexInformer
|
|
}
|
|
}
|
|
|
|
func (t *table) refreshMemory(ctx context.Context) error {
|
|
// wait for event handler to be synced before first computation of routes
|
|
for !t.httpScaledObjectEventHandlerRegistration.HasSynced() {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(time.Second):
|
|
continue
|
|
}
|
|
}
|
|
|
|
for {
|
|
m := t.newMemoryFromHTTPSOs()
|
|
t.memoryHolder.Set(m)
|
|
|
|
if err := t.memorySignaler.Wait(ctx); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (t *table) newMemoryFromHTTPSOs() TableMemory {
|
|
t.httpScaledObjectsMutex.RLock()
|
|
defer t.httpScaledObjectsMutex.RUnlock()
|
|
|
|
tm := NewTableMemory()
|
|
for _, newHTTPSO := range t.httpScaledObjects {
|
|
tm = tm.Remember(newHTTPSO)
|
|
}
|
|
|
|
return tm
|
|
}
|
|
|
|
var _ Table = (*table)(nil)
|
|
|
|
func (t *table) Start(ctx context.Context) error {
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
eg.Go(util.ApplyContext(t.runInformer, ctx))
|
|
eg.Go(util.ApplyContext(t.refreshMemory, ctx))
|
|
return eg.Wait()
|
|
}
|
|
|
|
func (t *table) Route(req *http.Request) *httpv1alpha1.HTTPScaledObject {
|
|
if req == nil {
|
|
return nil
|
|
}
|
|
|
|
tm := t.memoryHolder.Get()
|
|
if tm == nil {
|
|
return nil
|
|
}
|
|
|
|
key := NewKeyFromRequest(req)
|
|
return tm.Route(key)
|
|
}
|
|
|
|
func (t *table) HasSynced() bool {
|
|
tm := t.memoryHolder.Get()
|
|
return tm != nil
|
|
}
|
|
|
|
var _ cache.ResourceEventHandler = (*table)(nil)
|
|
|
|
func (t *table) OnAdd(obj interface{}, _ bool) {
|
|
httpScaledObject, ok := obj.(*httpv1alpha1.HTTPScaledObject)
|
|
if !ok {
|
|
return
|
|
}
|
|
key := *k8s.NamespacedNameFromObject(httpScaledObject)
|
|
|
|
window := time.Minute
|
|
granualrity := time.Second
|
|
if httpScaledObject.Spec.ScalingMetric != nil &&
|
|
httpScaledObject.Spec.ScalingMetric.Rate != nil {
|
|
window = httpScaledObject.Spec.ScalingMetric.Rate.Window.Duration
|
|
granualrity = httpScaledObject.Spec.ScalingMetric.Rate.Granularity.Duration
|
|
}
|
|
t.queueCounter.EnsureKey(key.String(), window, granualrity)
|
|
|
|
defer t.memorySignaler.Signal()
|
|
|
|
t.httpScaledObjectsMutex.Lock()
|
|
defer t.httpScaledObjectsMutex.Unlock()
|
|
|
|
t.httpScaledObjects[key] = httpScaledObject
|
|
}
|
|
|
|
func (t *table) OnUpdate(oldObj interface{}, newObj interface{}) {
|
|
oldHTTPSO, ok := oldObj.(*httpv1alpha1.HTTPScaledObject)
|
|
if !ok {
|
|
return
|
|
}
|
|
oldKey := *k8s.NamespacedNameFromObject(oldHTTPSO)
|
|
|
|
newHTTPSO, ok := newObj.(*httpv1alpha1.HTTPScaledObject)
|
|
if !ok {
|
|
return
|
|
}
|
|
newKey := *k8s.NamespacedNameFromObject(newHTTPSO)
|
|
|
|
window := time.Minute
|
|
granualrity := time.Second
|
|
if newHTTPSO.Spec.ScalingMetric != nil &&
|
|
newHTTPSO.Spec.ScalingMetric.Rate != nil {
|
|
window = newHTTPSO.Spec.ScalingMetric.Rate.Window.Duration
|
|
granualrity = newHTTPSO.Spec.ScalingMetric.Rate.Granularity.Duration
|
|
}
|
|
t.queueCounter.UpdateBuckets(newKey.String(), window, granualrity)
|
|
|
|
mustDelete := oldKey != newKey
|
|
defer t.memorySignaler.Signal()
|
|
|
|
t.httpScaledObjectsMutex.Lock()
|
|
defer t.httpScaledObjectsMutex.Unlock()
|
|
|
|
t.httpScaledObjects[newKey] = newHTTPSO
|
|
|
|
if mustDelete {
|
|
delete(t.httpScaledObjects, oldKey)
|
|
t.queueCounter.RemoveKey(oldKey.String())
|
|
}
|
|
}
|
|
|
|
func (t *table) OnDelete(obj interface{}) {
|
|
httpScaledObject, ok := obj.(*httpv1alpha1.HTTPScaledObject)
|
|
if !ok {
|
|
return
|
|
}
|
|
key := *k8s.NamespacedNameFromObject(httpScaledObject)
|
|
|
|
defer t.memorySignaler.Signal()
|
|
|
|
t.httpScaledObjectsMutex.Lock()
|
|
defer t.httpScaledObjectsMutex.Unlock()
|
|
|
|
delete(t.httpScaledObjects, key)
|
|
|
|
t.queueCounter.RemoveKey(key.String())
|
|
}
|
|
|
|
var _ util.HealthChecker = (*table)(nil)
|
|
|
|
func (t *table) HealthCheck(_ context.Context) error {
|
|
if !t.HasSynced() {
|
|
return errNotSyncedTable
|
|
}
|
|
|
|
return nil
|
|
}
|