From 9ecc67fb3afadcbdad64058b6f8f0c0b5811a57d Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Fri, 25 Oct 2019 10:42:37 +0200 Subject: [PATCH 1/7] Sent git hash and branch in the HTTP header of the webhook. --- cmd/git-sync/main.go | 36 ++++++++++++++++++++++++++++-------- cmd/git-sync/webhook.go | 15 +++++++++++---- 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index e40258d..361729c 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -269,7 +269,7 @@ func main() { log.V(0).Info("starting up", "args", os.Args) // Startup webhooks goroutine - webhookTriggerChan := make(chan struct{}, 1) + webhookTriggerChan := make(chan webhookRepoInfo, 1) if *flWebhookURL != "" { webhook := Webhook{ URL: *flWebhookURL, @@ -302,13 +302,9 @@ func main() { time.Sleep(waitTime(*flWait)) continue } else if changed { - // Trigger webhooks to be called. We do a non-blocking write to the channel as we - // don't want to backup the syncing locally because we can't complete a webhook call. - // Since the channel has a buffer of 1 we ensure that it is called for a change, but - // this allows us to de-dupe calls if they happen before the webhook call completes. - select { - case webhookTriggerChan <- struct{}{}: - default: + err := triggerWebhook(ctx, webhookTriggerChan, *flBranch, *flRev, *flRoot) + if err != nil { + log.Error(err, "triggering webhook failed") } } syncDuration.WithLabelValues("success").Observe(time.Now().Sub(start).Seconds()) @@ -695,3 +691,27 @@ func setupGitCookieFile() error { return nil } + +func triggerWebhook(ctx context.Context, ch chan webhookRepoInfo, branch, rev, gitRoot string) error { + info := webhookRepoInfo{ + Branch: branch, + } + + hash, err := hashForRev(ctx, rev, gitRoot) + if err != nil { + return err + } + + info.Hash = hash + + // Trigger webhooks to be called. We do a non-blocking write to the channel as we + // don't want to backup the syncing locally because we can't complete a webhook call. + // Since the channel has a buffer of 1 we ensure that it is called for a change, but + // this allows us to de-dupe calls if they happen before the webhook call completes. + select { + case ch <- info: + default: + } + + return nil +} diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index 7d634f5..64a8126 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -22,8 +22,15 @@ type Webhook struct { Backoff time.Duration } -func (w *Webhook) Do() error { +type webhookRepoInfo struct { + Hash string + Branch string +} + +func (w *Webhook) Do(info webhookRepoInfo) error { req, err := http.NewRequest(w.Method, w.URL, nil) + req.Header.Set("Git-Sync-Branch", info.Branch) + req.Header.Set("Git-Sync-Hash", info.Hash) if err != nil { return err } @@ -47,13 +54,13 @@ func (w *Webhook) Do() error { } // Wait for trigger events from the channel, and send webhooks when triggered -func (w *Webhook) run(ch chan struct{}) { +func (w *Webhook) run(ch chan webhookRepoInfo) { for { // Wait for trigger - <-ch + info := <-ch for { - if err := w.Do(); err != nil { + if err := w.Do(info); err != nil { log.Error(err, "error calling webhook", "url", w.URL) time.Sleep(w.Backoff) } else { From 1eb223e2ccf1fd65b182ad6b55d79433dabdbb29 Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Sat, 26 Oct 2019 13:34:16 +0200 Subject: [PATCH 2/7] Do not send branch information as http header. --- cmd/git-sync/main.go | 8 +++----- cmd/git-sync/webhook.go | 4 +--- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index 361729c..ec815df 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -302,7 +302,7 @@ func main() { time.Sleep(waitTime(*flWait)) continue } else if changed { - err := triggerWebhook(ctx, webhookTriggerChan, *flBranch, *flRev, *flRoot) + err := triggerWebhook(ctx, webhookTriggerChan, *flRev, *flRoot) if err != nil { log.Error(err, "triggering webhook failed") } @@ -692,10 +692,8 @@ func setupGitCookieFile() error { return nil } -func triggerWebhook(ctx context.Context, ch chan webhookRepoInfo, branch, rev, gitRoot string) error { - info := webhookRepoInfo{ - Branch: branch, - } +func triggerWebhook(ctx context.Context, ch chan webhookRepoInfo, rev, gitRoot string) error { + info := webhookRepoInfo{} hash, err := hashForRev(ctx, rev, gitRoot) if err != nil { diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index 64a8126..e3d8a2c 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -23,13 +23,11 @@ type Webhook struct { } type webhookRepoInfo struct { - Hash string - Branch string + Hash string } func (w *Webhook) Do(info webhookRepoInfo) error { req, err := http.NewRequest(w.Method, w.URL, nil) - req.Header.Set("Git-Sync-Branch", info.Branch) req.Header.Set("Git-Sync-Hash", info.Hash) if err != nil { return err From e214aeb4d6947d7a771933b751cd7f3abfd5a27a Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Sat, 26 Oct 2019 13:32:32 +0200 Subject: [PATCH 3/7] Rename http header "Git-Sync-Hash" into "Gitsync-Hash". --- cmd/git-sync/webhook.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index e3d8a2c..516be14 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -28,7 +28,7 @@ type webhookRepoInfo struct { func (w *Webhook) Do(info webhookRepoInfo) error { req, err := http.NewRequest(w.Method, w.URL, nil) - req.Header.Set("Git-Sync-Hash", info.Hash) + req.Header.Set("Gitsync-Hash", info.Hash) if err != nil { return err } From f0ec409582de67619eb832adba816d5e257a2c6e Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Sun, 27 Oct 2019 14:33:40 +0100 Subject: [PATCH 4/7] Populate last hash value in case webhook is slower than the main loop. --- cmd/git-sync/main.go | 28 +++++++-------- cmd/git-sync/webhook.go | 70 +++++++++++++++++++++++++++++++----- cmd/git-sync/webhook_test.go | 61 +++++++++++++++++++++++++++++++ 3 files changed, 135 insertions(+), 24 deletions(-) create mode 100644 cmd/git-sync/webhook_test.go diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index ec815df..373234c 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -269,16 +269,17 @@ func main() { log.V(0).Info("starting up", "args", os.Args) // Startup webhooks goroutine - webhookTriggerChan := make(chan webhookRepoInfo, 1) + var webhook *Webhook if *flWebhookURL != "" { - webhook := Webhook{ + webhook = &Webhook{ URL: *flWebhookURL, Method: *flWebhookMethod, Success: *flWebhookStatusSuccess, Timeout: *flWebhookTimeout, Backoff: *flWebhookBackoff, + Data: NewWebhookData(), } - go webhook.run(webhookTriggerChan) + go webhook.run() } initialSync := true @@ -301,8 +302,8 @@ func main() { cancel() time.Sleep(waitTime(*flWait)) continue - } else if changed { - err := triggerWebhook(ctx, webhookTriggerChan, *flRev, *flRoot) + } else if changed && webhook != nil { + err := triggerWebhook(ctx, webhook, *flRev, *flRoot, *flDest) if err != nil { log.Error(err, "triggering webhook failed") } @@ -692,24 +693,19 @@ func setupGitCookieFile() error { return nil } -func triggerWebhook(ctx context.Context, ch chan webhookRepoInfo, rev, gitRoot string) error { - info := webhookRepoInfo{} +func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest string) error { + target := path.Join(gitRoot, dest) - hash, err := hashForRev(ctx, rev, gitRoot) + hash, err := hashForRev(ctx, rev, target) if err != nil { return err } - info.Hash = hash - // Trigger webhooks to be called. We do a non-blocking write to the channel as we // don't want to backup the syncing locally because we can't complete a webhook call. - // Since the channel has a buffer of 1 we ensure that it is called for a change, but - // this allows us to de-dupe calls if they happen before the webhook call completes. - select { - case ch <- info: - default: - } + // Since the channel has a buffer of 1 we ensure that it is called for a change. Only + // the event of the last git hash will be send in case the webhook is not fast enough. + webhook.Data.UpdateAndTrigger(hash) return nil } diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index 516be14..35ce964 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -20,15 +20,68 @@ type Webhook struct { Timeout time.Duration // Backoff for failed webhook calls Backoff time.Duration + + Data *webhookData } -type webhookRepoInfo struct { - Hash string +type webhookData struct { + ch chan struct{} + + newHash string + curHash string } -func (w *Webhook) Do(info webhookRepoInfo) error { +func NewWebhookData() *webhookData { + return &webhookData{ + ch: make(chan struct{}, 1), + } +} + +func (d *webhookData) UpdateAndTrigger(newHash string) { + d.newHash = newHash + + select { + case d.ch <- struct{}{}: + default: + } +} + +func (d *webhookData) updateState() bool { + newHash := d.newHash + if newHash != d.curHash { + d.curHash = newHash + return true + } + return false +} + +func (d *webhookData) Hash() string { + d.updateState() + return d.curHash +} + +func (d *webhookData) Wait() bool { + // wait for message from UpdateAndTrigger + <-d.ch + + changed := d.updateState() + + return changed +} + +func (d *webhookData) WaitForChange() { + for { + changed := d.Wait() + + if changed { + return + } + } +} + +func (w *Webhook) Do(hash string) error { req, err := http.NewRequest(w.Method, w.URL, nil) - req.Header.Set("Gitsync-Hash", info.Hash) + req.Header.Set("Gitsync-Hash", hash) if err != nil { return err } @@ -52,13 +105,14 @@ func (w *Webhook) Do(info webhookRepoInfo) error { } // Wait for trigger events from the channel, and send webhooks when triggered -func (w *Webhook) run(ch chan webhookRepoInfo) { +func (w *Webhook) run() { for { - // Wait for trigger - info := <-ch + // Wait for trigger and changed hash value + w.Data.WaitForChange() for { - if err := w.Do(info); err != nil { + hash := w.Data.Hash() + if err := w.Do(hash); err != nil { log.Error(err, "error calling webhook", "url", w.URL) time.Sleep(w.Backoff) } else { diff --git a/cmd/git-sync/webhook_test.go b/cmd/git-sync/webhook_test.go new file mode 100644 index 0000000..c9e21e1 --- /dev/null +++ b/cmd/git-sync/webhook_test.go @@ -0,0 +1,61 @@ +package main + +import ( + "testing" +) + +const ( + hash1 = "1111111111111111111111111111111111111111" + hash2 = "2222222222222222222222222222222222222222" + hash3 = "3333333333333333333333333333333333333333" +) + +func TestWebhookData(t *testing.T) { + t.Run("webhhook consumes first hash value", func(t *testing.T) { + whd := NewWebhookData() + + whd.UpdateAndTrigger(hash1) + whd.WaitForChange() + + hash := whd.Hash() + if hash1 != hash { + t.Fatalf("expected hash %s but got %s", hash1, hash) + } + }) + + t.Run("second update wins", func(t *testing.T) { + whd := NewWebhookData() + + whd.UpdateAndTrigger(hash1) + whd.UpdateAndTrigger(hash2) + whd.WaitForChange() + + hash := whd.Hash() + if hash2 != hash { + t.Fatalf("expected hash %s but got %s", hash2, hash) + } + }) + + t.Run("same hash value does not lead to an update", func(t *testing.T) { + whd := NewWebhookData() + + whd.UpdateAndTrigger(hash1) + whd.WaitForChange() + hash := whd.Hash() + if hash1 != hash { + t.Fatalf("expected hash %s but got %s", hash1, hash) + } + + whd.UpdateAndTrigger(hash1) + changed := whd.Wait() + + if changed { + t.Fatalf("no change expected") + } + + hash = whd.Hash() + if hash1 != hash { + t.Fatalf("expected hash %s but got %s", hash1, hash) + } + }) +} From c3d026bf2bf26e3ce7e594d2f9c9f2ffe6d632bb Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Sun, 27 Oct 2019 14:47:18 +0100 Subject: [PATCH 5/7] Cleanup test. --- cmd/git-sync/webhook_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/git-sync/webhook_test.go b/cmd/git-sync/webhook_test.go index c9e21e1..d1205bc 100644 --- a/cmd/git-sync/webhook_test.go +++ b/cmd/git-sync/webhook_test.go @@ -7,7 +7,6 @@ import ( const ( hash1 = "1111111111111111111111111111111111111111" hash2 = "2222222222222222222222222222222222222222" - hash3 = "3333333333333333333333333333333333333333" ) func TestWebhookData(t *testing.T) { From 440eac730efebf19679da32fbdf5981bbc59fce7 Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Mon, 28 Oct 2019 21:57:03 +0100 Subject: [PATCH 6/7] Make hash value updates and reads atomic using mutex. --- cmd/git-sync/main.go | 2 -- cmd/git-sync/webhook.go | 65 +++++++++++++++--------------------- cmd/git-sync/webhook_test.go | 26 +++++++++------ 3 files changed, 42 insertions(+), 51 deletions(-) diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index 373234c..643364b 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -703,8 +703,6 @@ func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest st // Trigger webhooks to be called. We do a non-blocking write to the channel as we // don't want to backup the syncing locally because we can't complete a webhook call. - // Since the channel has a buffer of 1 we ensure that it is called for a change. Only - // the event of the last git hash will be send in case the webhook is not fast enough. webhook.Data.UpdateAndTrigger(hash) return nil diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index 35ce964..a5995f9 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" ) @@ -25,10 +26,9 @@ type Webhook struct { } type webhookData struct { - ch chan struct{} - - newHash string - curHash string + ch chan struct{} + mutex sync.Mutex + hash string } func NewWebhookData() *webhookData { @@ -37,8 +37,18 @@ func NewWebhookData() *webhookData { } } +func (d *webhookData) Events() chan struct{} { + return d.ch +} + +func (d *webhookData) update(newHash string) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.hash = newHash +} + func (d *webhookData) UpdateAndTrigger(newHash string) { - d.newHash = newHash + d.update(newHash) select { case d.ch <- struct{}{}: @@ -46,37 +56,10 @@ func (d *webhookData) UpdateAndTrigger(newHash string) { } } -func (d *webhookData) updateState() bool { - newHash := d.newHash - if newHash != d.curHash { - d.curHash = newHash - return true - } - return false -} - func (d *webhookData) Hash() string { - d.updateState() - return d.curHash -} - -func (d *webhookData) Wait() bool { - // wait for message from UpdateAndTrigger - <-d.ch - - changed := d.updateState() - - return changed -} - -func (d *webhookData) WaitForChange() { - for { - changed := d.Wait() - - if changed { - return - } - } + d.mutex.Lock() + defer d.mutex.Unlock() + return d.hash } func (w *Webhook) Do(hash string) error { @@ -106,17 +89,23 @@ func (w *Webhook) Do(hash string) error { // Wait for trigger events from the channel, and send webhooks when triggered func (w *Webhook) run() { - for { - // Wait for trigger and changed hash value - w.Data.WaitForChange() + var lastHash string + + // Wait for trigger from webhookData.UpdateAndTrigger + for range w.Data.Events() { for { hash := w.Data.Hash() + if hash == lastHash { + break + } + if err := w.Do(hash); err != nil { log.Error(err, "error calling webhook", "url", w.URL) time.Sleep(w.Backoff) } else { log.V(0).Info("success calling webhook", "url", w.URL) + lastHash = hash break } } diff --git a/cmd/git-sync/webhook_test.go b/cmd/git-sync/webhook_test.go index d1205bc..13d9411 100644 --- a/cmd/git-sync/webhook_test.go +++ b/cmd/git-sync/webhook_test.go @@ -1,6 +1,7 @@ package main import ( + "fmt" "testing" ) @@ -14,7 +15,8 @@ func TestWebhookData(t *testing.T) { whd := NewWebhookData() whd.UpdateAndTrigger(hash1) - whd.WaitForChange() + + <-whd.Events() hash := whd.Hash() if hash1 != hash { @@ -22,12 +24,16 @@ func TestWebhookData(t *testing.T) { } }) - t.Run("second update wins", func(t *testing.T) { + t.Run("last update wins when channel buffer is full", func(t *testing.T) { whd := NewWebhookData() - whd.UpdateAndTrigger(hash1) + for i := 0; i < 10; i++ { + h := fmt.Sprintf("111111111111111111111111111111111111111%d", i) + whd.UpdateAndTrigger(h) + } whd.UpdateAndTrigger(hash2) - whd.WaitForChange() + + <-whd.Events() hash := whd.Hash() if hash2 != hash { @@ -35,22 +41,20 @@ func TestWebhookData(t *testing.T) { } }) - t.Run("same hash value does not lead to an update", func(t *testing.T) { + t.Run("same hash value", func(t *testing.T) { whd := NewWebhookData() + events := whd.Events() whd.UpdateAndTrigger(hash1) - whd.WaitForChange() + <-events + hash := whd.Hash() if hash1 != hash { t.Fatalf("expected hash %s but got %s", hash1, hash) } whd.UpdateAndTrigger(hash1) - changed := whd.Wait() - - if changed { - t.Fatalf("no change expected") - } + <-events hash = whd.Hash() if hash1 != hash { From 5cb2198689c92f9a29eaba5eaa2267a34e5440e6 Mon Sep 17 00:00:00 2001 From: Michael Peick Date: Mon, 28 Oct 2019 21:58:44 +0100 Subject: [PATCH 7/7] Remove unnecessary comment. --- cmd/git-sync/main.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index 643364b..20ecbf1 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -701,8 +701,6 @@ func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest st return err } - // Trigger webhooks to be called. We do a non-blocking write to the channel as we - // don't want to backup the syncing locally because we can't complete a webhook call. webhook.Data.UpdateAndTrigger(hash) return nil