From 5daf3a42f77642879cf5cdd376c378d01435ebf6 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Mon, 28 Oct 2019 20:43:32 -0700 Subject: [PATCH] Clean up webhook a bit Return hash so we don't need to recalculate. Rename some funcs. --- cmd/git-sync/main.go | 36 +++++++++------------------------ cmd/git-sync/webhook.go | 39 +++++++++++++++++++++++------------- cmd/git-sync/webhook_test.go | 24 +++++++++++----------- 3 files changed, 47 insertions(+), 52 deletions(-) diff --git a/cmd/git-sync/main.go b/cmd/git-sync/main.go index d639a6b..8236e87 100644 --- a/cmd/git-sync/main.go +++ b/cmd/git-sync/main.go @@ -315,7 +315,7 @@ func main() { for { start := time.Now() ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(*flSyncTimeout)) - if changed, err := syncRepo(ctx, *flRepo, *flBranch, *flRev, *flDepth, *flRoot, *flDest); err != nil { + if changed, hash, err := syncRepo(ctx, *flRepo, *flBranch, *flRev, *flDepth, *flRoot, *flDest); err != nil { syncDuration.WithLabelValues("error").Observe(time.Since(start).Seconds()) syncCount.WithLabelValues("error").Inc() if *flMaxSyncFailures != -1 && failCount >= *flMaxSyncFailures { @@ -331,10 +331,7 @@ func main() { time.Sleep(waitTime(*flWait)) continue } else if changed && webhook != nil { - err := triggerWebhook(ctx, webhook, *flRev, *flRoot, *flDest) - if err != nil { - log.Error(err, "triggering webhook failed") - } + webhook.Send(hash) } syncDuration.WithLabelValues("success").Observe(time.Since(start).Seconds()) syncCount.WithLabelValues("success").Inc() @@ -549,8 +546,8 @@ func revIsHash(ctx context.Context, rev, gitRoot string) (bool, error) { } // syncRepo syncs the branch of a given repository to the destination at the given rev. -// returns (1) whether a change occured and (2) an error if one happened -func syncRepo(ctx context.Context, repo, branch, rev string, depth int, gitRoot, dest string) (bool, error) { +// returns (1) whether a change occured, (2) the new hash, and (3) an error if one happened +func syncRepo(ctx context.Context, repo, branch, rev string, depth int, gitRoot, dest string) (bool, string, error) { target := path.Join(gitRoot, dest) gitRepoPath := path.Join(target, ".git") var hash string @@ -559,29 +556,29 @@ func syncRepo(ctx context.Context, repo, branch, rev string, depth int, gitRoot, case os.IsNotExist(err): err = cloneRepo(ctx, repo, branch, rev, depth, gitRoot) if err != nil { - return false, err + return false, "", err } hash, err = hashForRev(ctx, rev, gitRoot) if err != nil { - return false, err + return false, "", err } case err != nil: - return false, fmt.Errorf("error checking if repo exists %q: %v", gitRepoPath, err) + return false, "", fmt.Errorf("error checking if repo exists %q: %v", gitRepoPath, err) default: local, remote, err := getRevs(ctx, target, branch, rev) if err != nil { - return false, err + return false, "", err } log.V(2).Info("git state", "local", local, "remote", remote) if local == remote { log.V(1).Info("no update required") - return false, nil + return false, "", nil } log.V(0).Info("update required") hash = remote } - return true, addWorktreeAndSwap(ctx, gitRoot, dest, branch, rev, depth, hash) + return true, hash, addWorktreeAndSwap(ctx, gitRoot, dest, branch, rev, depth, hash) } // getRevs returns the local and upstream hashes for rev. @@ -743,16 +740,3 @@ func setupGitCookieFile(ctx context.Context) error { return nil } - -func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest string) error { - target := path.Join(gitRoot, dest) - - hash, err := hashForRev(ctx, rev, target) - if err != nil { - return err - } - - webhook.Data.UpdateAndTrigger(hash) - - return nil -} diff --git a/cmd/git-sync/webhook.go b/cmd/git-sync/webhook.go index a5995f9..498b6fa 100644 --- a/cmd/git-sync/webhook.go +++ b/cmd/git-sync/webhook.go @@ -22,6 +22,7 @@ type Webhook struct { // Backoff for failed webhook calls Backoff time.Duration + // Holds the data as it crosses from producer to consumer. Data *webhookData } @@ -37,29 +38,36 @@ func NewWebhookData() *webhookData { } } -func (d *webhookData) Events() chan struct{} { +func (d *webhookData) events() chan struct{} { return d.ch } -func (d *webhookData) update(newHash string) { +func (d *webhookData) get() string { + d.mutex.Lock() + defer d.mutex.Unlock() + return d.hash +} + +func (d *webhookData) set(newHash string) { d.mutex.Lock() defer d.mutex.Unlock() d.hash = newHash } -func (d *webhookData) UpdateAndTrigger(newHash string) { - d.update(newHash) +func (d *webhookData) send(newHash string) { + d.set(newHash) + // Non-blocking write. If the channel is full, the consumer will see the + // newest value. If the channel was not full, the consumer will get another + // event. select { case d.ch <- struct{}{}: default: } } -func (d *webhookData) Hash() string { - d.mutex.Lock() - defer d.mutex.Unlock() - return d.hash +func (w *Webhook) Send(hash string) { + w.Data.send(hash) } func (w *Webhook) Do(hash string) error { @@ -73,6 +81,7 @@ func (w *Webhook) Do(hash string) error { defer cancel() req = req.WithContext(ctx) + log.V(0).Info("sending webhook", "hash", hash, "url", w.URL, "method", w.Method, "timeout", w.Timeout) resp, err := http.DefaultClient.Do(req) if err != nil { return err @@ -91,20 +100,22 @@ func (w *Webhook) Do(hash string) error { func (w *Webhook) run() { var lastHash string - // Wait for trigger from webhookData.UpdateAndTrigger - for range w.Data.Events() { - + // Wait for trigger from webhookData.Send + for range w.Data.events() { + // Retry in case of error for { - hash := w.Data.Hash() + // Always get the latest value, in case we fail-and-retry and the + // value changed in the meantime. This means that we might not send + // every single hash. + hash := w.Data.get() if hash == lastHash { break } if err := w.Do(hash); err != nil { - log.Error(err, "error calling webhook", "url", w.URL) + log.Error(err, "webhook failed", "url", w.URL, "method", w.Method, "timeout", w.Timeout) time.Sleep(w.Backoff) } else { - log.V(0).Info("success calling webhook", "url", w.URL) lastHash = hash break } diff --git a/cmd/git-sync/webhook_test.go b/cmd/git-sync/webhook_test.go index 13d9411..04f27bc 100644 --- a/cmd/git-sync/webhook_test.go +++ b/cmd/git-sync/webhook_test.go @@ -14,11 +14,11 @@ func TestWebhookData(t *testing.T) { t.Run("webhhook consumes first hash value", func(t *testing.T) { whd := NewWebhookData() - whd.UpdateAndTrigger(hash1) + whd.send(hash1) - <-whd.Events() + <-whd.events() - hash := whd.Hash() + hash := whd.get() if hash1 != hash { t.Fatalf("expected hash %s but got %s", hash1, hash) } @@ -29,13 +29,13 @@ func TestWebhookData(t *testing.T) { for i := 0; i < 10; i++ { h := fmt.Sprintf("111111111111111111111111111111111111111%d", i) - whd.UpdateAndTrigger(h) + whd.send(h) } - whd.UpdateAndTrigger(hash2) + whd.send(hash2) - <-whd.Events() + <-whd.events() - hash := whd.Hash() + hash := whd.get() if hash2 != hash { t.Fatalf("expected hash %s but got %s", hash2, hash) } @@ -43,20 +43,20 @@ func TestWebhookData(t *testing.T) { t.Run("same hash value", func(t *testing.T) { whd := NewWebhookData() - events := whd.Events() + events := whd.events() - whd.UpdateAndTrigger(hash1) + whd.send(hash1) <-events - hash := whd.Hash() + hash := whd.get() if hash1 != hash { t.Fatalf("expected hash %s but got %s", hash1, hash) } - whd.UpdateAndTrigger(hash1) + whd.send(hash1) <-events - hash = whd.Hash() + hash = whd.get() if hash1 != hash { t.Fatalf("expected hash %s but got %s", hash1, hash) }