Merge pull request #193 from peick/http-header-webhook

Sent git hash and branch in the HTTP header of the webhook.
This commit is contained in:
Kubernetes Prow Robot 2019-10-29 09:16:46 -07:00 committed by GitHub
commit 7353b157b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 139 additions and 17 deletions

View File

@ -297,16 +297,17 @@ func main() {
log.V(0).Info("starting up", "args", os.Args) log.V(0).Info("starting up", "args", os.Args)
// Startup webhooks goroutine // Startup webhooks goroutine
webhookTriggerChan := make(chan struct{}, 1) var webhook *Webhook
if *flWebhookURL != "" { if *flWebhookURL != "" {
webhook := Webhook{ webhook = &Webhook{
URL: *flWebhookURL, URL: *flWebhookURL,
Method: *flWebhookMethod, Method: *flWebhookMethod,
Success: *flWebhookStatusSuccess, Success: *flWebhookStatusSuccess,
Timeout: *flWebhookTimeout, Timeout: *flWebhookTimeout,
Backoff: *flWebhookBackoff, Backoff: *flWebhookBackoff,
Data: NewWebhookData(),
} }
go webhook.run(webhookTriggerChan) go webhook.run()
} }
initialSync := true initialSync := true
@ -329,14 +330,10 @@ func main() {
cancel() cancel()
time.Sleep(waitTime(*flWait)) time.Sleep(waitTime(*flWait))
continue continue
} else if changed { } else if changed && webhook != nil {
// Trigger webhooks to be called. We do a non-blocking write to the channel as we err := triggerWebhook(ctx, webhook, *flRev, *flRoot, *flDest)
// don't want to backup the syncing locally because we can't complete a webhook call. if err != nil {
// Since the channel has a buffer of 1 we ensure that it is called for a change, but log.Error(err, "triggering webhook failed")
// this allows us to de-dupe calls if they happen before the webhook call completes.
select {
case webhookTriggerChan <- struct{}{}:
default:
} }
} }
syncDuration.WithLabelValues("success").Observe(time.Since(start).Seconds()) syncDuration.WithLabelValues("success").Observe(time.Since(start).Seconds())
@ -746,3 +743,16 @@ func setupGitCookieFile(ctx context.Context) error {
return nil return nil
} }
func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest string) error {
target := path.Join(gitRoot, dest)
hash, err := hashForRev(ctx, rev, target)
if err != nil {
return err
}
webhook.Data.UpdateAndTrigger(hash)
return nil
}

View File

@ -4,6 +4,7 @@ import (
"context" "context"
"fmt" "fmt"
"net/http" "net/http"
"sync"
"time" "time"
) )
@ -20,10 +21,50 @@ type Webhook struct {
Timeout time.Duration Timeout time.Duration
// Backoff for failed webhook calls // Backoff for failed webhook calls
Backoff time.Duration Backoff time.Duration
Data *webhookData
} }
func (w *Webhook) Do() error { type webhookData struct {
ch chan struct{}
mutex sync.Mutex
hash string
}
func NewWebhookData() *webhookData {
return &webhookData{
ch: make(chan struct{}, 1),
}
}
func (d *webhookData) Events() chan struct{} {
return d.ch
}
func (d *webhookData) update(newHash string) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.hash = newHash
}
func (d *webhookData) UpdateAndTrigger(newHash string) {
d.update(newHash)
select {
case d.ch <- struct{}{}:
default:
}
}
func (d *webhookData) Hash() string {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.hash
}
func (w *Webhook) Do(hash string) error {
req, err := http.NewRequest(w.Method, w.URL, nil) req, err := http.NewRequest(w.Method, w.URL, nil)
req.Header.Set("Gitsync-Hash", hash)
if err != nil { if err != nil {
return err return err
} }
@ -47,17 +88,24 @@ func (w *Webhook) Do() error {
} }
// Wait for trigger events from the channel, and send webhooks when triggered // Wait for trigger events from the channel, and send webhooks when triggered
func (w *Webhook) run(ch chan struct{}) { func (w *Webhook) run() {
for { var lastHash string
// Wait for trigger
<-ch // Wait for trigger from webhookData.UpdateAndTrigger
for range w.Data.Events() {
for { for {
if err := w.Do(); err != nil { hash := w.Data.Hash()
if hash == lastHash {
break
}
if err := w.Do(hash); err != nil {
log.Error(err, "error calling webhook", "url", w.URL) log.Error(err, "error calling webhook", "url", w.URL)
time.Sleep(w.Backoff) time.Sleep(w.Backoff)
} else { } else {
log.V(0).Info("success calling webhook", "url", w.URL) log.V(0).Info("success calling webhook", "url", w.URL)
lastHash = hash
break break
} }
} }

View File

@ -0,0 +1,64 @@
package main
import (
"fmt"
"testing"
)
const (
hash1 = "1111111111111111111111111111111111111111"
hash2 = "2222222222222222222222222222222222222222"
)
func TestWebhookData(t *testing.T) {
t.Run("webhhook consumes first hash value", func(t *testing.T) {
whd := NewWebhookData()
whd.UpdateAndTrigger(hash1)
<-whd.Events()
hash := whd.Hash()
if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash)
}
})
t.Run("last update wins when channel buffer is full", func(t *testing.T) {
whd := NewWebhookData()
for i := 0; i < 10; i++ {
h := fmt.Sprintf("111111111111111111111111111111111111111%d", i)
whd.UpdateAndTrigger(h)
}
whd.UpdateAndTrigger(hash2)
<-whd.Events()
hash := whd.Hash()
if hash2 != hash {
t.Fatalf("expected hash %s but got %s", hash2, hash)
}
})
t.Run("same hash value", func(t *testing.T) {
whd := NewWebhookData()
events := whd.Events()
whd.UpdateAndTrigger(hash1)
<-events
hash := whd.Hash()
if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash)
}
whd.UpdateAndTrigger(hash1)
<-events
hash = whd.Hash()
if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash)
}
})
}