diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index 4f8a982..d639a6b 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -297,16 +297,17 @@ func main() { log.V(0).Info("starting up", "args", os.Args) // Startup webhooks goroutine - webhookTriggerChan := make(chan struct{}, 1) + var webhook *Webhook if *flWebhookURL != "" { - webhook := Webhook{ + webhook = &Webhook{ URL: *flWebhookURL, Method: *flWebhookMethod, Success: *flWebhookStatusSuccess, Timeout: *flWebhookTimeout, Backoff: *flWebhookBackoff, + Data: NewWebhookData(), } - go webhook.run(webhookTriggerChan) + go webhook.run() } initialSync := true @@ -329,14 +330,10 @@ func main() { cancel() time.Sleep(waitTime(*flWait)) continue - } else if changed { - // Trigger webhooks to be called. We do a non-blocking write to the channel as we - // don't want to backup the syncing locally because we can't complete a webhook call. - // Since the channel has a buffer of 1 we ensure that it is called for a change, but - // this allows us to de-dupe calls if they happen before the webhook call completes. - select { - case webhookTriggerChan <- struct{}{}: - default: + } else if changed && webhook != nil { + err := triggerWebhook(ctx, webhook, *flRev, *flRoot, *flDest) + if err != nil { + log.Error(err, "triggering webhook failed") } } syncDuration.WithLabelValues("success").Observe(time.Since(start).Seconds()) @@ -746,3 +743,16 @@ func setupGitCookieFile(ctx context.Context) error { return nil } + +func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest string) error { + target := path.Join(gitRoot, dest) + + hash, err := hashForRev(ctx, rev, target) + if err != nil { + return err + } + + webhook.Data.UpdateAndTrigger(hash) + + return nil +} diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index 7d634f5..a5995f9 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "net/http" + "sync" "time" ) @@ -20,10 +21,50 @@ type Webhook struct { Timeout time.Duration // Backoff for failed webhook calls Backoff time.Duration + + Data *webhookData } -func (w *Webhook) Do() error { +type webhookData struct { + ch chan struct{} + mutex sync.Mutex + hash string +} + +func NewWebhookData() *webhookData { + return &webhookData{ + ch: make(chan struct{}, 1), + } +} + +func (d *webhookData) Events() chan struct{} { + return d.ch +} + +func (d *webhookData) update(newHash string) { + d.mutex.Lock() + defer d.mutex.Unlock() + d.hash = newHash +} + +func (d *webhookData) UpdateAndTrigger(newHash string) { + d.update(newHash) + + select { + case d.ch <- struct{}{}: + default: + } +} + +func (d *webhookData) Hash() string { + d.mutex.Lock() + defer d.mutex.Unlock() + return d.hash +} + +func (w *Webhook) Do(hash string) error { req, err := http.NewRequest(w.Method, w.URL, nil) + req.Header.Set("Gitsync-Hash", hash) if err != nil { return err } @@ -47,17 +88,24 @@ func (w *Webhook) Do() error { } // Wait for trigger events from the channel, and send webhooks when triggered -func (w *Webhook) run(ch chan struct{}) { - for { - // Wait for trigger - <-ch +func (w *Webhook) run() { + var lastHash string + + // Wait for trigger from webhookData.UpdateAndTrigger + for range w.Data.Events() { for { - if err := w.Do(); err != nil { + hash := w.Data.Hash() + if hash == lastHash { + break + } + + if err := w.Do(hash); err != nil { log.Error(err, "error calling webhook", "url", w.URL) time.Sleep(w.Backoff) } else { log.V(0).Info("success calling webhook", "url", w.URL) + lastHash = hash break } } diff --git a/cmd/git-sync/webhook_test.go b/cmd/git-sync/webhook_test.go new file mode 100644 index 0000000..13d9411 --- /dev/null +++ b/cmd/git-sync/webhook_test.go @@ -0,0 +1,64 @@ +package main + +import ( + "fmt" + "testing" +) + +const ( + hash1 = "1111111111111111111111111111111111111111" + hash2 = "2222222222222222222222222222222222222222" +) + +func TestWebhookData(t *testing.T) { + t.Run("webhhook consumes first hash value", func(t *testing.T) { + whd := NewWebhookData() + + whd.UpdateAndTrigger(hash1) + + <-whd.Events() + + hash := whd.Hash() + if hash1 != hash { + t.Fatalf("expected hash %s but got %s", hash1, hash) + } + }) + + t.Run("last update wins when channel buffer is full", func(t *testing.T) { + whd := NewWebhookData() + + for i := 0; i < 10; i++ { + h := fmt.Sprintf("111111111111111111111111111111111111111%d", i) + whd.UpdateAndTrigger(h) + } + whd.UpdateAndTrigger(hash2) + + <-whd.Events() + + hash := whd.Hash() + if hash2 != hash { + t.Fatalf("expected hash %s but got %s", hash2, hash) + } + }) + + t.Run("same hash value", func(t *testing.T) { + whd := NewWebhookData() + events := whd.Events() + + whd.UpdateAndTrigger(hash1) + <-events + + hash := whd.Hash() + if hash1 != hash { + t.Fatalf("expected hash %s but got %s", hash1, hash) + } + + whd.UpdateAndTrigger(hash1) + <-events + + hash = whd.Hash() + if hash1 != hash { + t.Fatalf("expected hash %s but got %s", hash1, hash) + } + }) +}