diff --git a/plugin/pkg/audit/dynamic/dynamic.go b/plugin/pkg/audit/dynamic/dynamic.go index 80004a885..1eac3ace0 100644 --- a/plugin/pkg/audit/dynamic/dynamic.go +++ b/plugin/pkg/audit/dynamic/dynamic.go @@ -123,6 +123,7 @@ func NewBackend(c *Config) (audit.Backend, error) { config: c, delegates: atomic.Value{}, delegateUpdateMutex: sync.Mutex{}, + stopped: false, webhookClientManager: cm, recorder: recorder, } @@ -159,6 +160,7 @@ func NewBackend(c *Config) (audit.Backend, error) { type backend struct { // delegateUpdateMutex holds an update lock on the delegates delegateUpdateMutex sync.Mutex + stopped bool config *Config delegates atomic.Value webhookClientManager webhook.ClientManager @@ -201,6 +203,11 @@ func (b *backend) Run(stopCh <-chan struct{}) error { // the primary stopChan to the current delegate map. func (b *backend) stopAllDelegates() { b.delegateUpdateMutex.Lock() + defer b.delegateUpdateMutex.Unlock() + if b.stopped { + return + } + b.stopped = true for _, d := range b.GetDelegates() { close(d.stopChan) } @@ -237,6 +244,11 @@ func (b *backend) setDelegates(delegates syncedDelegates) { func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) { b.delegateUpdateMutex.Lock() defer b.delegateUpdateMutex.Unlock() + if b.stopped { + msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID) + klog.Error(msg) + return + } delegates := b.copyDelegates() if _, ok := delegates[sink.UID]; ok { klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID) @@ -262,6 +274,11 @@ func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) { func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { b.delegateUpdateMutex.Lock() defer b.delegateUpdateMutex.Unlock() + if b.stopped { + msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name) + klog.Error(msg) + return + } delegates := b.copyDelegates() oldDelegate, ok := delegates[oldSink.UID] if !ok { @@ -300,6 +317,11 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) { func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) { b.delegateUpdateMutex.Lock() defer b.delegateUpdateMutex.Unlock() + if b.stopped { + msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID) + klog.Warning(msg) + return + } delegates := b.copyDelegates() delegate, ok := delegates[sink.UID] if !ok {