From 440eac730efebf19679da32fbdf5981bbc59fce7 Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Mon, 28 Oct 2019 21:57:03 +0100 Subject: [PATCH] Make hash value updates and reads atomic using mutex. --- cmd/git-sync/main.go | 2 -- cmd/git-sync/webhook.go | 65 +++++++++++++++--------------------- cmd/git-sync/webhook_test.go | 26 +++++++++------ 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index 373234c..643364b 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -703,8 +703,6 @@ func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest st // Trigger webhooks to be called. We do a non-blocking write to the channel as we // don't want to backup the syncing locally because we can't complete a webhook call. - // Since the channel has a buffer of 1 we ensure that it is called for a change. Only - // the event of the last git hash will be send in case the webhook is not fast enough. webhook.Data.UpdateAndTrigger(hash) return nil diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index 35ce964..a5995f9 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" ) @@ -25,10 +26,9 @@ type Webhook struct { } type webhookData struct { - ch chan struct{} - - newHash string - curHash string + ch chan struct{} + mutex sync.Mutex + hash string } func NewWebhookData() *webhookData { @@ -37,8 +37,18 @@ func NewWebhookData() *webhookData { } } +func (d *webhookData) Events() chan struct{} { + return d.ch +} + +func (d *webhookData) update(newHash string) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.hash = newHash +} + func (d *webhookData) UpdateAndTrigger(newHash string) { - d.newHash = newHash + d.update(newHash) select { case d.ch <- struct{}{}: @@ -46,37 +56,10 @@ func (d *webhookData) UpdateAndTrigger(newHash string) { } } -func (d *webhookData) updateState() bool { - newHash := d.newHash - if newHash != d.curHash { - d.curHash = newHash - return true - } - return false -} - func (d *webhookData) Hash() string { - d.updateState() - return d.curHash -} - -func (d *webhookData) Wait() bool { - // wait for message from UpdateAndTrigger - <-d.ch - - changed := d.updateState() - - return changed -} - -func (d *webhookData) WaitForChange() { - for { - changed := d.Wait() - - if changed { - return - } - } + d.mutex.Lock() + defer d.mutex.Unlock() + return d.hash } func (w *Webhook) Do(hash string) error { @@ -106,17 +89,23 @@ func (w *Webhook) Do(hash string) error { // Wait for trigger events from the channel, and send webhooks when triggered func (w *Webhook) run() { - for { - // Wait for trigger and changed hash value - w.Data.WaitForChange() + var lastHash string + + // Wait for trigger from webhookData.UpdateAndTrigger + for range w.Data.Events() { for { hash := w.Data.Hash() + if hash == lastHash { + break + } + if err := w.Do(hash); err != nil { log.Error(err, "error calling webhook", "url", w.URL) time.Sleep(w.Backoff) } else { log.V(0).Info("success calling webhook", "url", w.URL) + lastHash = hash break } } diff --git a/cmd/git-sync/webhook_test.go b/cmd/git-sync/webhook_test.go index d1205bc..13d9411 100644 --- a/cmd/git-sync/webhook_test.go +++ b/cmd/git-sync/webhook_test.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "testing" ) @@ -14,7 +15,8 @@ func TestWebhookData(t *testing.T) { whd := NewWebhookData() whd.UpdateAndTrigger(hash1) - whd.WaitForChange() + + <-whd.Events() hash := whd.Hash() if hash1 != hash { @@ -22,12 +24,16 @@ func TestWebhookData(t *testing.T) { } }) - t.Run("second update wins", func(t *testing.T) { + t.Run("last update wins when channel buffer is full", func(t *testing.T) { whd := NewWebhookData() - whd.UpdateAndTrigger(hash1) + for i := 0; i < 10; i++ { + h := fmt.Sprintf("111111111111111111111111111111111111111%d", i) + whd.UpdateAndTrigger(h) + } whd.UpdateAndTrigger(hash2) - whd.WaitForChange() + + <-whd.Events() hash := whd.Hash() if hash2 != hash { @@ -35,22 +41,20 @@ func TestWebhookData(t *testing.T) { } }) - t.Run("same hash value does not lead to an update", func(t *testing.T) { + t.Run("same hash value", func(t *testing.T) { whd := NewWebhookData() + events := whd.Events() whd.UpdateAndTrigger(hash1) - whd.WaitForChange() + <-events + hash := whd.Hash() if hash1 != hash { t.Fatalf("expected hash %s but got %s", hash1, hash) } whd.UpdateAndTrigger(hash1) - changed := whd.Wait() - - if changed { - t.Fatalf("no change expected") - } + <-events hash = whd.Hash() if hash1 != hash {