Merge pull request #84483 from zxl381/MU_stopAllD

Fix a double lock bug in staging/.../apiserver

Kubernetes-commit: 6dde01d31494ca8a44c92011a2f7f55af465fd3a
This commit is contained in:
Kubernetes Publisher 2019-11-02 04:49:39 -07:00
commit 025332b0ce
1 changed files with 22 additions and 0 deletions

View File

@ -123,6 +123,7 @@ func NewBackend(c *Config) (audit.Backend, error) {
config: c,
delegates: atomic.Value{},
delegateUpdateMutex: sync.Mutex{},
stopped: false,
webhookClientManager: cm,
recorder: recorder,
}
@ -159,6 +160,7 @@ func NewBackend(c *Config) (audit.Backend, error) {
type backend struct {
// delegateUpdateMutex holds an update lock on the delegates
delegateUpdateMutex sync.Mutex
stopped bool
config *Config
delegates atomic.Value
webhookClientManager webhook.ClientManager
@ -201,6 +203,11 @@ func (b *backend) Run(stopCh <-chan struct{}) error {
// the primary stopChan to the current delegate map.
func (b *backend) stopAllDelegates() {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
return
}
b.stopped = true
for _, d := range b.GetDelegates() {
close(d.stopChan)
}
@ -237,6 +244,11 @@ func (b *backend) setDelegates(delegates syncedDelegates) {
func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not add audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
if _, ok := delegates[sink.UID]; ok {
klog.Errorf("Audit sink %q uid: %s already exists, could not readd", sink.Name, sink.UID)
@ -262,6 +274,11 @@ func (b *backend) addSink(sink *auditregv1alpha1.AuditSink) {
func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not update old audit sink %q to new audit sink %q. Update to all delegates is stopped.", oldSink.Name, newSink.Name)
klog.Error(msg)
return
}
delegates := b.copyDelegates()
oldDelegate, ok := delegates[oldSink.UID]
if !ok {
@ -300,6 +317,11 @@ func (b *backend) updateSink(oldSink, newSink *auditregv1alpha1.AuditSink) {
func (b *backend) deleteSink(sink *auditregv1alpha1.AuditSink) {
b.delegateUpdateMutex.Lock()
defer b.delegateUpdateMutex.Unlock()
if b.stopped {
msg := fmt.Sprintf("Could not delete audit sink %q uid: %s. Update to all delegates is stopped.", sink.Name, sink.UID)
klog.Warning(msg)
return
}
delegates := b.copyDelegates()
delegate, ok := delegates[sink.UID]
if !ok {