Populate last hash value in case webhook is slower than the main loop.
This commit is contained in:
parent
e214aeb4d6
commit
f0ec409582
|
|
@ -269,16 +269,17 @@ func main() {
|
||||||
log.V(0).Info("starting up", "args", os.Args)
|
log.V(0).Info("starting up", "args", os.Args)
|
||||||
|
|
||||||
// Startup webhooks goroutine
|
// Startup webhooks goroutine
|
||||||
webhookTriggerChan := make(chan webhookRepoInfo, 1)
|
var webhook *Webhook
|
||||||
if *flWebhookURL != "" {
|
if *flWebhookURL != "" {
|
||||||
webhook := Webhook{
|
webhook = &Webhook{
|
||||||
URL: *flWebhookURL,
|
URL: *flWebhookURL,
|
||||||
Method: *flWebhookMethod,
|
Method: *flWebhookMethod,
|
||||||
Success: *flWebhookStatusSuccess,
|
Success: *flWebhookStatusSuccess,
|
||||||
Timeout: *flWebhookTimeout,
|
Timeout: *flWebhookTimeout,
|
||||||
Backoff: *flWebhookBackoff,
|
Backoff: *flWebhookBackoff,
|
||||||
|
Data: NewWebhookData(),
|
||||||
}
|
}
|
||||||
go webhook.run(webhookTriggerChan)
|
go webhook.run()
|
||||||
}
|
}
|
||||||
|
|
||||||
initialSync := true
|
initialSync := true
|
||||||
|
|
@ -301,8 +302,8 @@ func main() {
|
||||||
cancel()
|
cancel()
|
||||||
time.Sleep(waitTime(*flWait))
|
time.Sleep(waitTime(*flWait))
|
||||||
continue
|
continue
|
||||||
} else if changed {
|
} else if changed && webhook != nil {
|
||||||
err := triggerWebhook(ctx, webhookTriggerChan, *flRev, *flRoot)
|
err := triggerWebhook(ctx, webhook, *flRev, *flRoot, *flDest)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error(err, "triggering webhook failed")
|
log.Error(err, "triggering webhook failed")
|
||||||
}
|
}
|
||||||
|
|
@ -692,24 +693,19 @@ func setupGitCookieFile() error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func triggerWebhook(ctx context.Context, ch chan webhookRepoInfo, rev, gitRoot string) error {
|
func triggerWebhook(ctx context.Context, webhook *Webhook, rev, gitRoot, dest string) error {
|
||||||
info := webhookRepoInfo{}
|
target := path.Join(gitRoot, dest)
|
||||||
|
|
||||||
hash, err := hashForRev(ctx, rev, gitRoot)
|
hash, err := hashForRev(ctx, rev, target)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
info.Hash = hash
|
|
||||||
|
|
||||||
// Trigger webhooks to be called. We do a non-blocking write to the channel as we
|
// Trigger webhooks to be called. We do a non-blocking write to the channel as we
|
||||||
// don't want to backup the syncing locally because we can't complete a webhook call.
|
// don't want to backup the syncing locally because we can't complete a webhook call.
|
||||||
// Since the channel has a buffer of 1 we ensure that it is called for a change, but
|
// Since the channel has a buffer of 1 we ensure that it is called for a change. Only
|
||||||
// this allows us to de-dupe calls if they happen before the webhook call completes.
|
// the event of the last git hash will be send in case the webhook is not fast enough.
|
||||||
select {
|
webhook.Data.UpdateAndTrigger(hash)
|
||||||
case ch <- info:
|
|
||||||
default:
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,15 +20,68 @@ type Webhook struct {
|
||||||
Timeout time.Duration
|
Timeout time.Duration
|
||||||
// Backoff for failed webhook calls
|
// Backoff for failed webhook calls
|
||||||
Backoff time.Duration
|
Backoff time.Duration
|
||||||
|
|
||||||
|
Data *webhookData
|
||||||
}
|
}
|
||||||
|
|
||||||
type webhookRepoInfo struct {
|
type webhookData struct {
|
||||||
Hash string
|
ch chan struct{}
|
||||||
|
|
||||||
|
newHash string
|
||||||
|
curHash string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Webhook) Do(info webhookRepoInfo) error {
|
func NewWebhookData() *webhookData {
|
||||||
|
return &webhookData{
|
||||||
|
ch: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *webhookData) UpdateAndTrigger(newHash string) {
|
||||||
|
d.newHash = newHash
|
||||||
|
|
||||||
|
select {
|
||||||
|
case d.ch <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *webhookData) updateState() bool {
|
||||||
|
newHash := d.newHash
|
||||||
|
if newHash != d.curHash {
|
||||||
|
d.curHash = newHash
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *webhookData) Hash() string {
|
||||||
|
d.updateState()
|
||||||
|
return d.curHash
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *webhookData) Wait() bool {
|
||||||
|
// wait for message from UpdateAndTrigger
|
||||||
|
<-d.ch
|
||||||
|
|
||||||
|
changed := d.updateState()
|
||||||
|
|
||||||
|
return changed
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *webhookData) WaitForChange() {
|
||||||
|
for {
|
||||||
|
changed := d.Wait()
|
||||||
|
|
||||||
|
if changed {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *Webhook) Do(hash string) error {
|
||||||
req, err := http.NewRequest(w.Method, w.URL, nil)
|
req, err := http.NewRequest(w.Method, w.URL, nil)
|
||||||
req.Header.Set("Gitsync-Hash", info.Hash)
|
req.Header.Set("Gitsync-Hash", hash)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -52,13 +105,14 @@ func (w *Webhook) Do(info webhookRepoInfo) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for trigger events from the channel, and send webhooks when triggered
|
// Wait for trigger events from the channel, and send webhooks when triggered
|
||||||
func (w *Webhook) run(ch chan webhookRepoInfo) {
|
func (w *Webhook) run() {
|
||||||
for {
|
for {
|
||||||
// Wait for trigger
|
// Wait for trigger and changed hash value
|
||||||
info := <-ch
|
w.Data.WaitForChange()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
if err := w.Do(info); err != nil {
|
hash := w.Data.Hash()
|
||||||
|
if err := w.Do(hash); err != nil {
|
||||||
log.Error(err, "error calling webhook", "url", w.URL)
|
log.Error(err, "error calling webhook", "url", w.URL)
|
||||||
time.Sleep(w.Backoff)
|
time.Sleep(w.Backoff)
|
||||||
} else {
|
} else {
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,61 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"testing"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
hash1 = "1111111111111111111111111111111111111111"
|
||||||
|
hash2 = "2222222222222222222222222222222222222222"
|
||||||
|
hash3 = "3333333333333333333333333333333333333333"
|
||||||
|
)
|
||||||
|
|
||||||
|
func TestWebhookData(t *testing.T) {
|
||||||
|
t.Run("webhhook consumes first hash value", func(t *testing.T) {
|
||||||
|
whd := NewWebhookData()
|
||||||
|
|
||||||
|
whd.UpdateAndTrigger(hash1)
|
||||||
|
whd.WaitForChange()
|
||||||
|
|
||||||
|
hash := whd.Hash()
|
||||||
|
if hash1 != hash {
|
||||||
|
t.Fatalf("expected hash %s but got %s", hash1, hash)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("second update wins", func(t *testing.T) {
|
||||||
|
whd := NewWebhookData()
|
||||||
|
|
||||||
|
whd.UpdateAndTrigger(hash1)
|
||||||
|
whd.UpdateAndTrigger(hash2)
|
||||||
|
whd.WaitForChange()
|
||||||
|
|
||||||
|
hash := whd.Hash()
|
||||||
|
if hash2 != hash {
|
||||||
|
t.Fatalf("expected hash %s but got %s", hash2, hash)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.Run("same hash value does not lead to an update", func(t *testing.T) {
|
||||||
|
whd := NewWebhookData()
|
||||||
|
|
||||||
|
whd.UpdateAndTrigger(hash1)
|
||||||
|
whd.WaitForChange()
|
||||||
|
hash := whd.Hash()
|
||||||
|
if hash1 != hash {
|
||||||
|
t.Fatalf("expected hash %s but got %s", hash1, hash)
|
||||||
|
}
|
||||||
|
|
||||||
|
whd.UpdateAndTrigger(hash1)
|
||||||
|
changed := whd.Wait()
|
||||||
|
|
||||||
|
if changed {
|
||||||
|
t.Fatalf("no change expected")
|
||||||
|
}
|
||||||
|
|
||||||
|
hash = whd.Hash()
|
||||||
|
if hash1 != hash {
|
||||||
|
t.Fatalf("expected hash %s but got %s", hash1, hash)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue