Merge pull request #200 from thockin/followup-pr-193-webhook-cleanup

Clean up webhook a bit
This commit is contained in:
Kubernetes Prow Robot 2019-11-19 07:43:42 -08:00 committed by GitHub
commit 6e604b38b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 52 deletions

View File

@ -315,7 +315,7 @@ func main() {
for { for {
start := time.Now() start := time.Now()
ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(*flSyncTimeout)) ctx, cancel := context.WithTimeout(context.Background(), time.Second*time.Duration(*flSyncTimeout))
if changed, err := syncRepo(ctx, *flRepo, *flBranch, *flRev, *flDepth, *flRoot, *flDest); err != nil { if changed, hash, err := syncRepo(ctx, *flRepo, *flBranch, *flRev, *flDepth, *flRoot, *flDest); err != nil {
syncDuration.WithLabelValues("error").Observe(time.Since(start).Seconds()) syncDuration.WithLabelValues("error").Observe(time.Since(start).Seconds())
syncCount.WithLabelValues("error").Inc() syncCount.WithLabelValues("error").Inc()
if *flMaxSyncFailures != -1 && failCount >= *flMaxSyncFailures { if *flMaxSyncFailures != -1 && failCount >= *flMaxSyncFailures {
@ -331,10 +331,7 @@ func main() {
time.Sleep(waitTime(*flWait)) time.Sleep(waitTime(*flWait))
continue continue
} else if changed && webhook != nil { } else if changed && webhook != nil {
err := triggerWebhook(ctx, webhook, *flRev, *flRoot, *flDest) webhook.Send(hash)
if err != nil {
log.Error(err, "triggering webhook failed")
}
} }
syncDuration.WithLabelValues("success").Observe(time.Since(start).Seconds()) syncDuration.WithLabelValues("success").Observe(time.Since(start).Seconds())
syncCount.WithLabelValues("success").Inc() syncCount.WithLabelValues("success").Inc()
@ -549,8 +546,8 @@ func revIsHash(ctx context.Context, rev, gitRoot string) (bool, error) {
} }
// syncRepo syncs the branch of a given repository to the destination at the given rev. // syncRepo syncs the branch of a given repository to the destination at the given rev.
// returns (1) whether a change occured and (2) an error if one happened // returns (1) whether a change occured, (2) the new hash, and (3) an error if one happened
func syncRepo(ctx context.Context, repo, branch, rev string, depth int, gitRoot, dest string) (bool, error) { func syncRepo(ctx context.Context, repo, branch, rev string, depth int, gitRoot, dest string) (bool, string, error) {
target := path.Join(gitRoot, dest) target := path.Join(gitRoot, dest)
gitRepoPath := path.Join(target, ".git") gitRepoPath := path.Join(target, ".git")
var hash string var hash string
@ -559,29 +556,29 @@ func syncRepo(ctx context.Context, repo, branch, rev string, depth int, gitRoot,
case os.IsNotExist(err): case os.IsNotExist(err):
err = cloneRepo(ctx, repo, branch, rev, depth, gitRoot) err = cloneRepo(ctx, repo, branch, rev, depth, gitRoot)
if err != nil { if err != nil {
return false, err return false, "", err
} }
hash, err = hashForRev(ctx, rev, gitRoot) hash, err = hashForRev(ctx, rev, gitRoot)
if err != nil { if err != nil {
return false, err return false, "", err
} }
case err != nil: case err != nil:
return false, fmt.Errorf("error checking if repo exists %q: %v", gitRepoPath, err) return false, "", fmt.Errorf("error checking if repo exists %q: %v", gitRepoPath, err)
default: default:
local, remote, err := getRevs(ctx, target, branch, rev) local, remote, err := getRevs(ctx, target, branch, rev)
if err != nil { if err != nil {
return false, err return false, "", err
} }
log.V(2).Info("git state", "local", local, "remote", remote) log.V(2).Info("git state", "local", local, "remote", remote)
if local == remote { if local == remote {
log.V(1).Info("no update required") log.V(1).Info("no update required")
return false, nil return false, "", nil
} }
log.V(0).Info("update required") log.V(0).Info("update required")
hash = remote hash = remote
} }
return true, addWorktreeAndSwap(ctx, gitRoot, dest, branch, rev, depth, hash) return true, hash, addWorktreeAndSwap(ctx, gitRoot, dest, branch, rev, depth, hash)
} }
// getRevs returns the local and upstream hashes for rev. // getRevs returns the local and upstream hashes for rev.
@ -743,16 +740,3 @@ func setupGitCookieFile(ctx context.Context) error {
return nil return nil
} }
func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest string) error {
target := path.Join(gitRoot, dest)
hash, err := hashForRev(ctx, rev, target)
if err != nil {
return err
}
webhook.Data.UpdateAndTrigger(hash)
return nil
}

View File

@ -22,6 +22,7 @@ type Webhook struct {
// Backoff for failed webhook calls // Backoff for failed webhook calls
Backoff time.Duration Backoff time.Duration
// Holds the data as it crosses from producer to consumer.
Data *webhookData Data *webhookData
} }
@ -37,29 +38,36 @@ func NewWebhookData() *webhookData {
} }
} }
func (d *webhookData) Events() chan struct{} { func (d *webhookData) events() chan struct{} {
return d.ch return d.ch
} }
func (d *webhookData) update(newHash string) { func (d *webhookData) get() string {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.hash
}
func (d *webhookData) set(newHash string) {
d.mutex.Lock() d.mutex.Lock()
defer d.mutex.Unlock() defer d.mutex.Unlock()
d.hash = newHash d.hash = newHash
} }
func (d *webhookData) UpdateAndTrigger(newHash string) { func (d *webhookData) send(newHash string) {
d.update(newHash) d.set(newHash)
// Non-blocking write. If the channel is full, the consumer will see the
// newest value. If the channel was not full, the consumer will get another
// event.
select { select {
case d.ch <- struct{}{}: case d.ch <- struct{}{}:
default: default:
} }
} }
func (d *webhookData) Hash() string { func (w *Webhook) Send(hash string) {
d.mutex.Lock() w.Data.send(hash)
defer d.mutex.Unlock()
return d.hash
} }
func (w *Webhook) Do(hash string) error { func (w *Webhook) Do(hash string) error {
@ -73,6 +81,7 @@ func (w *Webhook) Do(hash string) error {
defer cancel() defer cancel()
req = req.WithContext(ctx) req = req.WithContext(ctx)
log.V(0).Info("sending webhook", "hash", hash, "url", w.URL, "method", w.Method, "timeout", w.Timeout)
resp, err := http.DefaultClient.Do(req) resp, err := http.DefaultClient.Do(req)
if err != nil { if err != nil {
return err return err
@ -91,20 +100,22 @@ func (w *Webhook) Do(hash string) error {
func (w *Webhook) run() { func (w *Webhook) run() {
var lastHash string var lastHash string
// Wait for trigger from webhookData.UpdateAndTrigger // Wait for trigger from webhookData.Send
for range w.Data.Events() { for range w.Data.events() {
// Retry in case of error
for { for {
hash := w.Data.Hash() // Always get the latest value, in case we fail-and-retry and the
// value changed in the meantime. This means that we might not send
// every single hash.
hash := w.Data.get()
if hash == lastHash { if hash == lastHash {
break break
} }
if err := w.Do(hash); err != nil { if err := w.Do(hash); err != nil {
log.Error(err, "error calling webhook", "url", w.URL) log.Error(err, "webhook failed", "url", w.URL, "method", w.Method, "timeout", w.Timeout)
time.Sleep(w.Backoff) time.Sleep(w.Backoff)
} else { } else {
log.V(0).Info("success calling webhook", "url", w.URL)
lastHash = hash lastHash = hash
break break
} }

View File

@ -14,11 +14,11 @@ func TestWebhookData(t *testing.T) {
t.Run("webhhook consumes first hash value", func(t *testing.T) { t.Run("webhhook consumes first hash value", func(t *testing.T) {
whd := NewWebhookData() whd := NewWebhookData()
whd.UpdateAndTrigger(hash1) whd.send(hash1)
<-whd.Events() <-whd.events()
hash := whd.Hash() hash := whd.get()
if hash1 != hash { if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash) t.Fatalf("expected hash %s but got %s", hash1, hash)
} }
@ -29,13 +29,13 @@ func TestWebhookData(t *testing.T) {
for i := 0; i < 10; i++ { for i := 0; i < 10; i++ {
h := fmt.Sprintf("111111111111111111111111111111111111111%d", i) h := fmt.Sprintf("111111111111111111111111111111111111111%d", i)
whd.UpdateAndTrigger(h) whd.send(h)
} }
whd.UpdateAndTrigger(hash2) whd.send(hash2)
<-whd.Events() <-whd.events()
hash := whd.Hash() hash := whd.get()
if hash2 != hash { if hash2 != hash {
t.Fatalf("expected hash %s but got %s", hash2, hash) t.Fatalf("expected hash %s but got %s", hash2, hash)
} }
@ -43,20 +43,20 @@ func TestWebhookData(t *testing.T) {
t.Run("same hash value", func(t *testing.T) { t.Run("same hash value", func(t *testing.T) {
whd := NewWebhookData() whd := NewWebhookData()
events := whd.Events() events := whd.events()
whd.UpdateAndTrigger(hash1) whd.send(hash1)
<-events <-events
hash := whd.Hash() hash := whd.get()
if hash1 != hash { if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash) t.Fatalf("expected hash %s but got %s", hash1, hash)
} }
whd.UpdateAndTrigger(hash1) whd.send(hash1)
<-events <-events
hash = whd.Hash() hash = whd.get()
if hash1 != hash { if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash) t.Fatalf("expected hash %s but got %s", hash1, hash)
} }