Reduce to a single webhook call
This commit is contained in:
parent
96714ebed2
commit
fdc9b49de2
|
|
@ -21,7 +21,6 @@ package main // import "k8s.io/git-sync/cmd/git-sync"
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
|
@ -63,8 +62,14 @@ var flMaxSyncFailures = flag.Int("max-sync-failures", envInt("GIT_SYNC_MAX_SYNC_
|
||||||
var flChmod = flag.Int("change-permissions", envInt("GIT_SYNC_PERMISSIONS", 0),
|
var flChmod = flag.Int("change-permissions", envInt("GIT_SYNC_PERMISSIONS", 0),
|
||||||
"the file permissions to apply to the checked-out files")
|
"the file permissions to apply to the checked-out files")
|
||||||
|
|
||||||
var flWebhooks = flag.String("webhook", envString("GIT_SYNC_WEBHOOK", ""),
|
var flWebhookURL = flag.String("webhook-url", envString("GIT_SYNC_WEBHOOK_URL", ""),
|
||||||
"the JSON formatted array of webhooks to be sent when git is synced")
|
"the URL for the webook to send to. Default is \"\" which disables the webook.")
|
||||||
|
var flWebhookMethod = flag.String("webhook-method", envString("GIT_SYNC_WEBHOOK_METHOD", "POST"),
|
||||||
|
"the method for the webook to send with")
|
||||||
|
var flWebhookStatusSuccess = flag.Int("webhook-success-status", envInt("GIT_SYNC_WEBHOOK_SUCCESS_STATUS", 200),
|
||||||
|
"the status code which indicates a successful webhook call")
|
||||||
|
var flWebhookTimeout = flag.Duration("webhook-timeout-duration", envDuration("GIT_SYNC_WEBHOOK_TIMEOUT_DURATION", time.Second),
|
||||||
|
"the timeout used when communicating with the webhook target")
|
||||||
|
|
||||||
var flUsername = flag.String("username", envString("GIT_SYNC_USERNAME", ""),
|
var flUsername = flag.String("username", envString("GIT_SYNC_USERNAME", ""),
|
||||||
"the username to use")
|
"the username to use")
|
||||||
|
|
@ -140,6 +145,18 @@ func envFloat(key string, def float64) float64 {
|
||||||
return def
|
return def
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func envDuration(key string, def time.Duration) time.Duration {
|
||||||
|
if env := os.Getenv(key); env != "" {
|
||||||
|
val, err := time.ParseDuration(env)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("invalid value for %q: using default: %v", key, def)
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
return def
|
||||||
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
setFlagDefaults()
|
setFlagDefaults()
|
||||||
|
|
||||||
|
|
@ -175,13 +192,6 @@ func main() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if *flWebhooks != "" {
|
|
||||||
if err := json.Unmarshal([]byte(*flWebhooks), &WebhookArray); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "Error parsing webhooks JSON: %v\n", err)
|
|
||||||
os.Exit(1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if *flSSH {
|
if *flSSH {
|
||||||
if err := setupGitSSH(*flSSHKnownHosts); err != nil {
|
if err := setupGitSSH(*flSSHKnownHosts); err != nil {
|
||||||
fmt.Fprintf(os.Stderr, "ERROR: can't configure SSH: %v\n", err)
|
fmt.Fprintf(os.Stderr, "ERROR: can't configure SSH: %v\n", err)
|
||||||
|
|
@ -200,8 +210,16 @@ func main() {
|
||||||
log.V(0).Infof("starting up: %q", os.Args)
|
log.V(0).Infof("starting up: %q", os.Args)
|
||||||
|
|
||||||
// Startup webhooks goroutine
|
// Startup webhooks goroutine
|
||||||
webhookTriggerChan := make(chan struct{})
|
webhookTriggerChan := make(chan struct{}, 1)
|
||||||
go ServeWebhooks(webhookTriggerChan)
|
if *flWebhookURL != "" {
|
||||||
|
webhook := Webhook{
|
||||||
|
URL: *flWebhookURL,
|
||||||
|
Method: *flWebhookMethod,
|
||||||
|
Success: flWebhookStatusSuccess,
|
||||||
|
Timeout: *flWebhookTimeout,
|
||||||
|
}
|
||||||
|
go webhook.run(webhookTriggerChan)
|
||||||
|
}
|
||||||
|
|
||||||
initialSync := true
|
initialSync := true
|
||||||
failCount := 0
|
failCount := 0
|
||||||
|
|
@ -220,8 +238,14 @@ func main() {
|
||||||
time.Sleep(waitTime(*flWait))
|
time.Sleep(waitTime(*flWait))
|
||||||
continue
|
continue
|
||||||
} else if changed {
|
} else if changed {
|
||||||
// Trigger webhooks to be called
|
// Trigger webhooks to be called. We do a non-blocking write to the channel as we
|
||||||
webhookTriggerChan <- struct{}{}
|
// don't want to backup the syncing locally because we can't complete a webhook call.
|
||||||
|
// Since the channel has a buffer of 1 we ensure that it is called for a change, but
|
||||||
|
// this allows us to de-dupe calls if they happen before the webhook call completes.
|
||||||
|
select {
|
||||||
|
case webhookTriggerChan <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
}
|
}
|
||||||
if initialSync {
|
if initialSync {
|
||||||
if *flOneTime {
|
if *flOneTime {
|
||||||
|
|
|
||||||
|
|
@ -2,38 +2,22 @@ package main
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Webhook collection
|
|
||||||
var WebhookArray = []Webhook{}
|
|
||||||
|
|
||||||
// WebHook structure
|
// WebHook structure
|
||||||
type Webhook struct {
|
type Webhook struct {
|
||||||
// URL for the http/s request
|
// URL for the http/s request
|
||||||
URL string `json:"url"`
|
URL string
|
||||||
// Method for the http/s request
|
// Method for the http/s request
|
||||||
Method string `json:"method"`
|
Method string
|
||||||
// Code to look for when determining if the request was successful.
|
// Code to look for when determining if the request was successful.
|
||||||
// If this is not specified, request is sent and forgotten about.
|
// If this is not specified, request is sent and forgotten about.
|
||||||
Success *int `json:"success"`
|
Success *int
|
||||||
// Timeout for the http/s request
|
// Timeout for the http/s request
|
||||||
Timeout time.Duration `json:"timeout"`
|
Timeout time.Duration
|
||||||
}
|
|
||||||
|
|
||||||
func (w *Webhook) UnmarshalJSON(data []byte) error {
|
|
||||||
type testAlias Webhook
|
|
||||||
test := &testAlias{
|
|
||||||
Timeout: time.Second * 5,
|
|
||||||
}
|
|
||||||
|
|
||||||
_ = json.Unmarshal(data, test)
|
|
||||||
|
|
||||||
*w = Webhook(*test)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (w *Webhook) Do() error {
|
func (w *Webhook) Do() error {
|
||||||
|
|
@ -62,20 +46,18 @@ func (w *Webhook) Do() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wait for trigger events from the channel, and send webhooks when triggered
|
// Wait for trigger events from the channel, and send webhooks when triggered
|
||||||
func ServeWebhooks(ch chan struct{}) {
|
func (w *Webhook) run(ch chan struct{}) {
|
||||||
for {
|
for {
|
||||||
// Wait for trigger
|
// Wait for trigger
|
||||||
<-ch
|
<-ch
|
||||||
|
|
||||||
// Calling webhook - one after another
|
for {
|
||||||
for _, v := range WebhookArray {
|
if err := w.Do(); err != nil {
|
||||||
log.V(0).Infof("calling webhook %v\n", v.URL)
|
log.Errorf("error calling webhook %v: %v", w.URL, err)
|
||||||
if err := v.Do(); err != nil {
|
|
||||||
log.Errorf("error calling webhook %v: %v", v.URL, err)
|
|
||||||
} else {
|
} else {
|
||||||
log.V(0).Infof("calling webhook %v was: OK\n", v.URL)
|
log.V(0).Infof("calling webhook %v was: OK\n", w.URL)
|
||||||
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
Loading…
Reference in New Issue