Make hash value updates and reads atomic using mutex.

This commit is contained in:
Michael Peick 2019-10-28 21:57:03 +01:00
parent c3d026bf2b
commit 440eac730e
3 changed files with 42 additions and 51 deletions

View File

@ -703,8 +703,6 @@ func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest st
// Trigger webhooks to be called. We do a non-blocking write to the channel as we // Trigger webhooks to be called. We do a non-blocking write to the channel as we
// don't want to backup the syncing locally because we can't complete a webhook call. // don't want to backup the syncing locally because we can't complete a webhook call.
// Since the channel has a buffer of 1 we ensure that it is called for a change. Only
// the event of the last git hash will be send in case the webhook is not fast enough.
webhook.Data.UpdateAndTrigger(hash) webhook.Data.UpdateAndTrigger(hash)
return nil return nil

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
) )
@ -25,10 +26,9 @@ type Webhook struct {
} }
type webhookData struct { type webhookData struct {
ch chan struct{} ch chan struct{}
mutex sync.Mutex
newHash string hash string
curHash string
} }
func NewWebhookData() *webhookData { func NewWebhookData() *webhookData {
@ -37,8 +37,18 @@ func NewWebhookData() *webhookData {
} }
} }
func (d *webhookData) Events() chan struct{} {
return d.ch
}
func (d *webhookData) update(newHash string) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.hash = newHash
}
func (d *webhookData) UpdateAndTrigger(newHash string) { func (d *webhookData) UpdateAndTrigger(newHash string) {
d.newHash = newHash d.update(newHash)
select { select {
case d.ch <- struct{}{}: case d.ch <- struct{}{}:
@ -46,37 +56,10 @@ func (d *webhookData) UpdateAndTrigger(newHash string) {
} }
} }
func (d *webhookData) updateState() bool {
newHash := d.newHash
if newHash != d.curHash {
d.curHash = newHash
return true
}
return false
}
func (d *webhookData) Hash() string { func (d *webhookData) Hash() string {
d.updateState() d.mutex.Lock()
return d.curHash defer d.mutex.Unlock()
} return d.hash
func (d *webhookData) Wait() bool {
// wait for message from UpdateAndTrigger
<-d.ch
changed := d.updateState()
return changed
}
func (d *webhookData) WaitForChange() {
for {
changed := d.Wait()
if changed {
return
}
}
} }
func (w *Webhook) Do(hash string) error { func (w *Webhook) Do(hash string) error {
@ -106,17 +89,23 @@ func (w *Webhook) Do(hash string) error {
// Wait for trigger events from the channel, and send webhooks when triggered // Wait for trigger events from the channel, and send webhooks when triggered
func (w *Webhook) run() { func (w *Webhook) run() {
for { var lastHash string
// Wait for trigger and changed hash value
w.Data.WaitForChange() // Wait for trigger from webhookData.UpdateAndTrigger
for range w.Data.Events() {
for { for {
hash := w.Data.Hash() hash := w.Data.Hash()
if hash == lastHash {
break
}
if err := w.Do(hash); err != nil { if err := w.Do(hash); err != nil {
log.Error(err, "error calling webhook", "url", w.URL) log.Error(err, "error calling webhook", "url", w.URL)
time.Sleep(w.Backoff) time.Sleep(w.Backoff)
} else { } else {
log.V(0).Info("success calling webhook", "url", w.URL) log.V(0).Info("success calling webhook", "url", w.URL)
lastHash = hash
break break
} }
} }

View File

@ -1,6 +1,7 @@
package main package main
import ( import (
"fmt"
"testing" "testing"
) )
@ -14,7 +15,8 @@ func TestWebhookData(t *testing.T) {
whd := NewWebhookData() whd := NewWebhookData()
whd.UpdateAndTrigger(hash1) whd.UpdateAndTrigger(hash1)
whd.WaitForChange()
<-whd.Events()
hash := whd.Hash() hash := whd.Hash()
if hash1 != hash { if hash1 != hash {
@ -22,12 +24,16 @@ func TestWebhookData(t *testing.T) {
} }
}) })
t.Run("second update wins", func(t *testing.T) { t.Run("last update wins when channel buffer is full", func(t *testing.T) {
whd := NewWebhookData() whd := NewWebhookData()
whd.UpdateAndTrigger(hash1) for i := 0; i < 10; i++ {
h := fmt.Sprintf("111111111111111111111111111111111111111%d", i)
whd.UpdateAndTrigger(h)
}
whd.UpdateAndTrigger(hash2) whd.UpdateAndTrigger(hash2)
whd.WaitForChange()
<-whd.Events()
hash := whd.Hash() hash := whd.Hash()
if hash2 != hash { if hash2 != hash {
@ -35,22 +41,20 @@ func TestWebhookData(t *testing.T) {
} }
}) })
t.Run("same hash value does not lead to an update", func(t *testing.T) { t.Run("same hash value", func(t *testing.T) {
whd := NewWebhookData() whd := NewWebhookData()
events := whd.Events()
whd.UpdateAndTrigger(hash1) whd.UpdateAndTrigger(hash1)
whd.WaitForChange() <-events
hash := whd.Hash() hash := whd.Hash()
if hash1 != hash { if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash) t.Fatalf("expected hash %s but got %s", hash1, hash)
} }
whd.UpdateAndTrigger(hash1) whd.UpdateAndTrigger(hash1)
changed := whd.Wait() <-events
if changed {
t.Fatalf("no change expected")
}
hash = whd.Hash() hash = whd.Hash()
if hash1 != hash { if hash1 != hash {