Merge pull request #441 from thockin/master

Make exechooks work like webhooks.
This commit is contained in:
Kubernetes Prow Robot 2021-08-18 13:59:45 -07:00 committed by GitHub
commit 80b7774dc0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 904 additions and 472 deletions

View File

@ -19,9 +19,7 @@ limitations under the License.
package main // import "k8s.io/git-sync/cmd/git-sync"
import (
"bytes"
"context"
"encoding/json"
stdflag "flag" // renamed so we don't accidentally use it
"fmt"
"io"
@ -38,11 +36,12 @@ import (
"sync"
"time"
"github.com/go-logr/glogr"
"github.com/go-logr/logr"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/pflag"
"k8s.io/git-sync/pkg/cmd"
"k8s.io/git-sync/pkg/hook"
"k8s.io/git-sync/pkg/logging"
"k8s.io/git-sync/pkg/pid1"
"k8s.io/git-sync/pkg/version"
)
@ -82,11 +81,16 @@ var flMaxSyncFailures = pflag.Int("max-sync-failures", envInt("GIT_SYNC_MAX_SYNC
var flChmod = pflag.Int("change-permissions", envInt("GIT_SYNC_PERMISSIONS", 0),
"optionally change permissions on the checked-out files to the specified mode")
var flSyncHookCommand = pflag.String("sync-hook-command", envString("GIT_SYNC_HOOK_COMMAND", ""),
"an optional command to be executed after syncing a new hash of the remote repository")
var flSparseCheckoutFile = pflag.String("sparse-checkout-file", envString("GIT_SYNC_SPARSE_CHECKOUT_FILE", ""),
"the path to a sparse-checkout file")
var flExechookCommand = pflag.String("exechook-command", envString("GIT_SYNC_EXECHOOK_COMMAND", ""),
"an optional command to be run when syncs complete")
var flExechookTimeout = pflag.Duration("exechook-timeout", envDuration("GIT_SYNC_EXECHOOK_TIMEOUT", time.Second*30),
"the timeout for the exechook")
var flExechookBackoff = pflag.Duration("exechook-backoff", envDuration("GIT_SYNC_EXECHOOK_BACKOFF", time.Second*3),
"the time to wait before retrying a failed exechook")
var flWebhookURL = pflag.String("webhook-url", envString("GIT_SYNC_WEBHOOK_URL", ""),
"a URL for optional webhook notifications when syncs complete")
var flWebhookMethod = pflag.String("webhook-method", envString("GIT_SYNC_WEBHOOK_METHOD", "POST"),
@ -141,15 +145,16 @@ var flTimeout = pflag.Int("timeout", envInt("GIT_SYNC_TIMEOUT", 0),
"DEPRECATED: use --sync-timeout instead")
var flDest = pflag.String("dest", envString("GIT_SYNC_DEST", ""),
"DEPRECATED: use --link instead")
var flSyncHookCommand = pflag.String("sync-hook-command", envString("GIT_SYNC_HOOK_COMMAND", ""),
"DEPRECATED: use --exechook-command instead")
func init() {
pflag.CommandLine.MarkDeprecated("wait", "use --period instead")
pflag.CommandLine.MarkDeprecated("timeout", "use --sync-timeout instead")
pflag.CommandLine.MarkDeprecated("dest", "use --link instead")
pflag.CommandLine.MarkDeprecated("sync-hook-command", "use --exechook-command instead")
}
var log *customLogger
// Total pull/error, summary on pull duration
var (
// TODO: have a marker for "which" servergroup
@ -183,103 +188,6 @@ const (
submodulesOff submodulesMode = "off"
)
type customLogger struct {
logr.Logger
root string
errorFile string
}
func (l customLogger) Error(err error, msg string, kvList ...interface{}) {
l.Logger.Error(err, msg, kvList...)
if l.errorFile == "" {
return
}
payload := struct {
Msg string
Err string
Args map[string]interface{}
}{
Msg: msg,
Err: err.Error(),
Args: map[string]interface{}{},
}
if len(kvList)%2 != 0 {
kvList = append(kvList, "<no-value>")
}
for i := 0; i < len(kvList); i += 2 {
k, ok := kvList[i].(string)
if !ok {
k = fmt.Sprintf("%v", kvList[i])
}
payload.Args[k] = kvList[i+1]
}
jb, err := json.Marshal(payload)
if err != nil {
l.Logger.Error(err, "can't encode error payload")
content := fmt.Sprintf("%v", err)
l.writeContent([]byte(content))
} else {
l.writeContent(jb)
}
}
// exportError exports the error to the error file if --export-error is enabled.
func (l *customLogger) exportError(content string) {
if l.errorFile == "" {
return
}
l.writeContent([]byte(content))
}
// writeContent writes the error content to the error file.
func (l *customLogger) writeContent(content []byte) {
if _, err := os.Stat(l.root); os.IsNotExist(err) {
fileMode := os.FileMode(0755)
if err := os.Mkdir(l.root, fileMode); err != nil {
l.Logger.Error(err, "can't create the root directory", "root", l.root)
return
}
}
tmpFile, err := ioutil.TempFile(l.root, "tmp-err-")
if err != nil {
l.Logger.Error(err, "can't create temporary error-file", "directory", l.root, "prefix", "tmp-err-")
return
}
defer func() {
if err := tmpFile.Close(); err != nil {
l.Logger.Error(err, "can't close temporary error-file", "filename", tmpFile.Name())
}
}()
if _, err = tmpFile.Write(content); err != nil {
l.Logger.Error(err, "can't write to temporary error-file", "filename", tmpFile.Name())
return
}
errorFile := filepath.Join(l.root, l.errorFile)
if err := os.Rename(tmpFile.Name(), errorFile); err != nil {
l.Logger.Error(err, "can't rename to error-file", "temp-file", tmpFile.Name(), "error-file", errorFile)
return
}
if err := os.Chmod(errorFile, 0644); err != nil {
l.Logger.Error(err, "can't change permissions on the error-file", "error-file", errorFile)
}
}
// deleteErrorFile deletes the error file.
func (l *customLogger) deleteErrorFile() {
if l.errorFile == "" {
return
}
errorFile := filepath.Join(l.root, l.errorFile)
if err := os.Remove(errorFile); err != nil {
if os.IsNotExist(err) {
return
}
l.Logger.Error(err, "can't delete the error-file", "filename", errorFile)
}
}
func init() {
prometheus.MustRegister(syncDuration)
prometheus.MustRegister(syncCount)
@ -341,11 +249,11 @@ func envDuration(key string, def time.Duration) time.Duration {
return def
}
func setGlogFlags() {
func setGlogFlags(v int, log *logging.Logger) {
// Force logging to stderr.
stderrFlag := stdflag.Lookup("logtostderr")
if stderrFlag == nil {
handleError(false, "ERROR: can't find glog flag 'logtostderr'")
handleError(log, false, "ERROR: can't find glog flag 'logtostderr'")
}
stderrFlag.Value.Set("true")
@ -355,7 +263,7 @@ func setGlogFlags() {
fmt.Fprintf(os.Stderr, "ERROR: can't find glog flag 'v'\n")
os.Exit(1)
}
vFlag.Value.Set(strconv.Itoa(*flVerbose))
vFlag.Value.Set(strconv.Itoa(v))
}
// repoSync represents the remote repo and the local sync of it.
@ -372,6 +280,8 @@ type repoSync struct {
authURL string // a URL to re-fetch credentials, or ""
sparseFile string // path to a sparse-checkout file
syncHookCmd string // command to run after each sync
log *logging.Logger
run *cmd.Runner
}
func main() {
@ -392,9 +302,12 @@ func main() {
pflag.Parse()
stdflag.CommandLine.Parse(nil) // Otherwise glog complains
setGlogFlags()
log = &customLogger{glogr.New(), *flRoot, *flErrorFile}
// Needs to happen very early for errors to be written to a file.
log := logging.New(*flRoot, *flErrorFile)
cmdRunner := cmd.NewRunner(log)
setGlogFlags(*flVerbose, log)
if *flVersion {
fmt.Println(version.VERSION)
@ -411,21 +324,21 @@ func main() {
}
if *flRepo == "" {
handleError(true, "ERROR: --repo must be specified")
handleError(log, true, "ERROR: --repo must be specified")
}
if *flDepth < 0 { // 0 means "no limit"
handleError(true, "ERROR: --depth must be greater than or equal to 0")
handleError(log, true, "ERROR: --depth must be greater than or equal to 0")
}
switch submodulesMode(*flSubmodules) {
case submodulesRecursive, submodulesShallow, submodulesOff:
default:
handleError(true, "ERROR: --submodules must be one of %q, %q, or %q", submodulesRecursive, submodulesShallow, submodulesOff)
handleError(log, true, "ERROR: --submodules must be one of %q, %q, or %q", submodulesRecursive, submodulesShallow, submodulesOff)
}
if *flRoot == "" {
handleError(true, "ERROR: --root must be specified")
handleError(log, true, "ERROR: --root must be specified")
}
if *flDest != "" {
@ -436,69 +349,81 @@ func main() {
*flLink = parts[len(parts)-1]
}
if strings.Contains(*flLink, "/") {
handleError(true, "ERROR: --link must not contain '/'")
handleError(log, true, "ERROR: --link must not contain '/'")
}
if strings.HasPrefix(*flLink, ".") {
handleError(true, "ERROR: --link must not start with '.'")
handleError(log, true, "ERROR: --link must not start with '.'")
}
if *flWait != 0 {
*flPeriod = time.Duration(int(*flWait*1000)) * time.Millisecond
}
if *flPeriod < 10*time.Millisecond {
handleError(true, "ERROR: --period must be at least 10ms")
handleError(log, true, "ERROR: --period must be at least 10ms")
}
if *flTimeout != 0 {
*flSyncTimeout = time.Duration(*flTimeout) * time.Second
}
if *flSyncTimeout < 10*time.Millisecond {
handleError(true, "ERROR: --sync-timeout must be at least 10ms")
handleError(log, true, "ERROR: --sync-timeout must be at least 10ms")
}
if *flSyncHookCommand != "" {
*flExechookCommand = *flSyncHookCommand
}
if *flExechookCommand != "" {
if *flExechookTimeout < time.Second {
handleError(log, true, "ERROR: --exechook-timeout must be at least 1s")
}
if *flExechookBackoff < time.Second {
handleError(log, true, "ERROR: --exechook-backoff must be at least 1s")
}
}
if *flWebhookURL != "" {
if *flWebhookStatusSuccess < -1 {
handleError(true, "ERROR: --webhook-success-status must be a valid HTTP code or -1")
handleError(log, true, "ERROR: --webhook-success-status must be a valid HTTP code or -1")
}
if *flWebhookTimeout < time.Second {
handleError(true, "ERROR: --webhook-timeout must be at least 1s")
handleError(log, true, "ERROR: --webhook-timeout must be at least 1s")
}
if *flWebhookBackoff < time.Second {
handleError(true, "ERROR: --webhook-backoff must be at least 1s")
handleError(log, true, "ERROR: --webhook-backoff must be at least 1s")
}
}
if *flPassword != "" && *flPasswordFile != "" {
handleError(false, "ERROR: only one of --password and --password-file may be specified")
handleError(log, false, "ERROR: only one of --password and --password-file may be specified")
}
if *flUsername != "" {
if *flPassword == "" && *flPasswordFile == "" {
handleError(true, "ERROR: --password or --password-file must be set when --username is specified")
handleError(log, true, "ERROR: --password or --password-file must be set when --username is specified")
}
}
if *flSSH {
if *flUsername != "" {
handleError(false, "ERROR: only one of --ssh and --username may be specified")
handleError(log, false, "ERROR: only one of --ssh and --username may be specified")
}
if *flPassword != "" {
handleError(false, "ERROR: only one of --ssh and --password may be specified")
handleError(log, false, "ERROR: only one of --ssh and --password may be specified")
}
if *flPasswordFile != "" {
handleError(false, "ERROR: only one of --ssh and --password-file may be specified")
handleError(log, false, "ERROR: only one of --ssh and --password-file may be specified")
}
if *flAskPassURL != "" {
handleError(false, "ERROR: only one of --ssh and --askpass-url may be specified")
handleError(log, false, "ERROR: only one of --ssh and --askpass-url may be specified")
}
if *flCookieFile {
handleError(false, "ERROR: only one of --ssh and --cookie-file may be specified")
handleError(log, false, "ERROR: only one of --ssh and --cookie-file may be specified")
}
if *flSSHKeyFile == "" {
handleError(true, "ERROR: --ssh-key-file must be specified when --ssh is specified")
handleError(log, true, "ERROR: --ssh-key-file must be specified when --ssh is specified")
}
if *flSSHKnownHosts {
if *flSSHKnownHostsFile == "" {
handleError(true, "ERROR: --ssh-known-hosts-file must be specified when --ssh-known-hosts is specified")
handleError(log, true, "ERROR: --ssh-known-hosts-file must be specified when --ssh-known-hosts is specified")
}
}
}
@ -553,6 +478,8 @@ func main() {
authURL: *flAskPassURL,
sparseFile: *flSparseCheckoutFile,
syncHookCmd: *flSyncHookCommand,
log: log,
run: cmdRunner,
}
// This context is used only for git credentials initialization. There are no long-running operations like
@ -575,7 +502,7 @@ func main() {
}
if *flSSH {
if err := setupGitSSH(*flSSHKnownHosts, *flSSHKeyFile, *flSSHKnownHostsFile); err != nil {
if err := git.SetupGitSSH(*flSSHKnownHosts, *flSSHKeyFile, *flSSHKnownHostsFile); err != nil {
log.Error(err, "ERROR: can't set up git SSH")
os.Exit(1)
}
@ -635,17 +562,42 @@ func main() {
}
// Startup webhooks goroutine
var webhook *Webhook
var webhookRunner *hook.HookRunner
if *flWebhookURL != "" {
webhook = &Webhook{
URL: *flWebhookURL,
Method: *flWebhookMethod,
Success: *flWebhookStatusSuccess,
Timeout: *flWebhookTimeout,
Backoff: *flWebhookBackoff,
Data: NewWebhookData(),
}
go webhook.run()
webhook := hook.NewWebhook(
*flWebhookURL,
*flWebhookMethod,
*flWebhookStatusSuccess,
*flWebhookTimeout,
log,
)
webhookRunner = hook.NewHookRunner(
webhook,
*flWebhookBackoff,
hook.NewHookData(),
log,
)
go webhookRunner.Run(context.Background())
}
// Startup exechooks goroutine
var exechookRunner *hook.HookRunner
if *flExechookCommand != "" {
exechook := hook.NewExechook(
cmd.NewRunner(log),
*flExechookCommand,
*flRoot,
[]string{},
*flExechookTimeout,
log,
)
exechookRunner = hook.NewHookRunner(
exechook,
*flExechookBackoff,
hook.NewHookData(),
log,
)
go exechookRunner.Run(context.Background())
}
initialSync := true
@ -676,9 +628,13 @@ func main() {
time.Sleep(*flPeriod)
continue
} else if changed {
if webhook != nil {
webhook.Send(hash)
if webhookRunner != nil {
webhookRunner.Send(hash)
}
if exechookRunner != nil {
exechookRunner.Send(hash)
}
updateSyncMetrics(metricKeySuccess, start)
} else {
updateSyncMetrics(metricKeyNoOp, start)
@ -686,7 +642,7 @@ func main() {
if initialSync {
if *flOneTime {
log.deleteErrorFile()
log.DeleteErrorFile()
os.Exit(0)
}
if isHash, err := git.RevIsHash(ctx); err != nil {
@ -694,14 +650,14 @@ func main() {
os.Exit(1)
} else if isHash {
log.V(0).Info("rev appears to be a git hash, no further sync needed", "rev", git.rev)
log.deleteErrorFile()
log.DeleteErrorFile()
sleepForever()
}
initialSync = false
}
failCount = 0
log.deleteErrorFile()
log.DeleteErrorFile()
log.V(1).Info("next sync", "waitTime", flPeriod.String())
cancel()
time.Sleep(*flPeriod)
@ -730,17 +686,17 @@ func (git *repoSync) InitRepo(ctx context.Context) error {
// Make sure the directory we found is actually usable.
if git.SanityCheck(ctx) {
log.V(0).Info("root directory is valid", "path", git.root)
git.log.V(0).Info("root directory is valid", "path", git.root)
return nil
}
// Maybe a previous run crashed? Git won't use this dir.
log.V(0).Info("root directory exists but failed checks, cleaning up", "path", git.root)
git.log.V(0).Info("root directory exists but failed checks, cleaning up", "path", git.root)
// We remove the contents rather than the dir itself, because a common
// use-case is to have a volume mounted at git.root, which makes removing
// it impossible.
if err := removeDirContents(git.root); err != nil {
if err := removeDirContents(git.root, git.log); err != nil {
return fmt.Errorf("can't remove unusable git root: %w", err)
}
@ -749,32 +705,32 @@ func (git *repoSync) InitRepo(ctx context.Context) error {
// sanityCheck tries to make sure that the dir is a valid git repository.
func (git *repoSync) SanityCheck(ctx context.Context) bool {
log.V(0).Info("sanity-checking git repo", "repo", git.root)
git.log.V(0).Info("sanity-checking git repo", "repo", git.root)
// If it is empty, we are done.
if empty, err := dirIsEmpty(git.root); err != nil {
log.Error(err, "can't list repo directory", "repo", git.root)
git.log.Error(err, "can't list repo directory", "repo", git.root)
return false
} else if empty {
log.V(0).Info("git repo is empty", "repo", git.root)
git.log.V(0).Info("git repo is empty", "repo", git.root)
return true
}
// Check that this is actually the root of the repo.
if root, err := runCommand(ctx, git.root, git.cmd, "rev-parse", "--show-toplevel"); err != nil {
log.Error(err, "can't get repo toplevel", "repo", git.root)
if root, err := git.run.Run(ctx, git.root, git.cmd, "rev-parse", "--show-toplevel"); err != nil {
git.log.Error(err, "can't get repo toplevel", "repo", git.root)
return false
} else {
root = strings.TrimSpace(root)
if root != git.root {
log.V(0).Info("git repo is under another repo", "repo", git.root, "parent", root)
git.log.V(0).Info("git repo is under another repo", "repo", git.root, "parent", root)
return false
}
}
// Consistency-check the repo.
if _, err := runCommand(ctx, git.root, git.cmd, "fsck", "--no-progress", "--connectivity-only"); err != nil {
log.Error(err, "repo sanity check failed", "repo", git.root)
if _, err := git.run.Run(ctx, git.root, git.cmd, "fsck", "--no-progress", "--connectivity-only"); err != nil {
git.log.Error(err, "repo sanity check failed", "repo", git.root)
return false
}
@ -789,7 +745,7 @@ func dirIsEmpty(dir string) (bool, error) {
return len(dirents) == 0, nil
}
func removeDirContents(dir string) error {
func removeDirContents(dir string, log *logging.Logger) error {
dirents, err := ioutil.ReadDir(dir)
if err != nil {
return err
@ -797,7 +753,9 @@ func removeDirContents(dir string) error {
for _, fi := range dirents {
p := filepath.Join(dir, fi.Name())
log.V(2).Info("removing path recursively", "path", p, "isDir", fi.IsDir())
if log != nil {
log.V(2).Info("removing path recursively", "path", p, "isDir", fi.IsDir())
}
if err := os.RemoveAll(p); err != nil {
return err
}
@ -822,13 +780,13 @@ func sleepForever() {
// handleError prints the error to the standard error, prints the usage if the `printUsage` flag is true,
// exports the error to the error file and exits the process with the exit code.
func handleError(printUsage bool, format string, a ...interface{}) {
func handleError(log *logging.Logger, printUsage bool, format string, a ...interface{}) {
s := fmt.Sprintf(format, a...)
fmt.Fprintln(os.Stderr, s)
if printUsage {
pflag.Usage()
}
log.exportError(s)
log.ExportError(s)
os.Exit(1)
}
@ -874,13 +832,13 @@ func (git *repoSync) UpdateSymlink(ctx context.Context, newDir string) (string,
}
const tmplink = "tmp-link"
log.V(1).Info("creating tmp symlink", "root", git.root, "dst", newDirRelative, "src", tmplink)
if _, err := runCommand(ctx, git.root, "ln", "-snf", newDirRelative, tmplink); err != nil {
git.log.V(1).Info("creating tmp symlink", "root", git.root, "dst", newDirRelative, "src", tmplink)
if _, err := git.run.Run(ctx, git.root, "ln", "-snf", newDirRelative, tmplink); err != nil {
return "", fmt.Errorf("error creating symlink: %v", err)
}
log.V(1).Info("renaming symlink", "root", git.root, "oldName", tmplink, "newName", git.link)
if _, err := runCommand(ctx, git.root, "mv", "-T", tmplink, git.link); err != nil {
git.log.V(1).Info("renaming symlink", "root", git.root, "oldName", tmplink, "newName", git.link)
if _, err := git.run.Run(ctx, git.root, "mv", "-T", tmplink, git.link); err != nil {
return "", fmt.Errorf("error replacing symlink: %v", err)
}
@ -903,13 +861,13 @@ func setRepoReady() {
repoReady = true
}
// cleanupWorkTree() is used to remove a worktree and its folder
func cleanupWorkTree(ctx context.Context, gitRoot, worktree string) error {
// CleanupWorkTree() is used to remove a worktree and its folder
func (git *repoSync) CleanupWorkTree(ctx context.Context, gitRoot, worktree string) error {
// Clean up worktree(s)
log.V(1).Info("removing worktree", "path", worktree)
git.log.V(1).Info("removing worktree", "path", worktree)
if err := os.RemoveAll(worktree); err != nil {
return fmt.Errorf("error removing directory: %v", err)
} else if _, err := runCommand(ctx, gitRoot, *flGitCmd, "worktree", "prune"); err != nil {
} else if _, err := git.run.Run(ctx, gitRoot, git.cmd, "worktree", "prune"); err != nil {
return err
}
return nil
@ -917,7 +875,7 @@ func cleanupWorkTree(ctx context.Context, gitRoot, worktree string) error {
// AddWorktreeAndSwap creates a new worktree and calls UpdateSymlink to swap the symlink to point to the new worktree
func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error {
log.V(0).Info("syncing git", "rev", git.rev, "hash", hash)
git.log.V(0).Info("syncing git", "rev", git.rev, "hash", hash)
args := []string{"fetch", "-f", "--tags"}
if git.depth != 0 {
@ -926,7 +884,7 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
args = append(args, "origin", git.branch)
// Update from the remote.
if _, err := runCommand(ctx, git.root, git.cmd, args...); err != nil {
if _, err := git.run.Run(ctx, git.root, git.cmd, args...); err != nil {
return err
}
@ -934,12 +892,12 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
// end up NOT fetching the hash we wanted. If we can't resolve that hash
// to a commit we can just end early and leave it for the next sync period.
if _, err := git.ResolveRef(ctx, hash); err != nil {
log.Error(err, "can't resolve commit, will retry", "rev", git.rev, "hash", hash)
git.log.Error(err, "can't resolve commit, will retry", "rev", git.rev, "hash", hash)
return nil
}
// GC clone
if _, err := runCommand(ctx, git.root, git.cmd, "gc", "--prune=all"); err != nil {
if _, err := git.run.Run(ctx, git.root, git.cmd, "gc", "--prune=all"); err != nil {
return err
}
@ -950,12 +908,12 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
// error'd without cleaning up. The next time thru the sync loop fails to
// create the worktree and bails out. This manifests as:
// "fatal: '/repo/root/rev-nnnn' already exists"
if err := cleanupWorkTree(ctx, git.root, worktreePath); err != nil {
if err := git.CleanupWorkTree(ctx, git.root, worktreePath); err != nil {
return err
}
_, err := runCommand(ctx, git.root, git.cmd, "worktree", "add", worktreePath, "origin/"+git.branch, "--no-checkout")
log.V(0).Info("adding worktree", "path", worktreePath, "branch", fmt.Sprintf("origin/%s", git.branch))
_, err := git.run.Run(ctx, git.root, git.cmd, "worktree", "add", worktreePath, "origin/"+git.branch, "--no-checkout")
git.log.V(0).Info("adding worktree", "path", worktreePath, "branch", fmt.Sprintf("origin/%s", git.branch))
if err != nil {
return err
}
@ -977,7 +935,7 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
if git.sparseFile != "" {
// This is required due to the undocumented behavior outlined here:
// https://public-inbox.org/git/CAPig+cSP0UiEBXSCi7Ua099eOdpMk8R=JtAjPuUavRF4z0R0Vg@mail.gmail.com/t/
log.V(0).Info("configuring worktree sparse checkout")
git.log.V(0).Info("configuring worktree sparse checkout")
checkoutFile := git.sparseFile
gitInfoPath := filepath.Join(git.root, fmt.Sprintf(".git/worktrees/%s/info", hash))
@ -1009,23 +967,23 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
}
args := []string{"sparse-checkout", "init"}
_, err = runCommand(ctx, worktreePath, git.cmd, args...)
_, err = git.run.Run(ctx, worktreePath, git.cmd, args...)
if err != nil {
return err
}
}
// Reset the worktree's working copy to the specific rev.
_, err = runCommand(ctx, worktreePath, git.cmd, "reset", "--hard", hash, "--")
_, err = git.run.Run(ctx, worktreePath, git.cmd, "reset", "--hard", hash, "--")
if err != nil {
return err
}
log.V(0).Info("reset worktree to hash", "path", worktreePath, "hash", hash)
git.log.V(0).Info("reset worktree to hash", "path", worktreePath, "hash", hash)
// Update submodules
// NOTE: this works for repo with or without submodules.
if git.submodules != submodulesOff {
log.V(0).Info("updating submodules")
git.log.V(0).Info("updating submodules")
submodulesArgs := []string{"submodule", "update", "--init"}
if git.submodules == submodulesRecursive {
submodulesArgs = append(submodulesArgs, "--recursive")
@ -1033,7 +991,7 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
if git.depth != 0 {
submodulesArgs = append(submodulesArgs, "--depth", strconv.Itoa(git.depth))
}
_, err = runCommand(ctx, worktreePath, git.cmd, submodulesArgs...)
_, err = git.run.Run(ctx, worktreePath, git.cmd, submodulesArgs...)
if err != nil {
return err
}
@ -1042,19 +1000,19 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
// Change the file permissions, if requested.
if git.chmod != 0 {
mode := fmt.Sprintf("%#o", git.chmod)
log.V(0).Info("changing file permissions", "mode", mode)
_, err = runCommand(ctx, "", "chmod", "-R", mode, worktreePath)
git.log.V(0).Info("changing file permissions", "mode", mode)
_, err = git.run.Run(ctx, "", "chmod", "-R", mode, worktreePath)
if err != nil {
return err
}
}
// Reset the root's rev (so we can prune and so we can rely on it later).
_, err = runCommand(ctx, git.root, git.cmd, "reset", "--hard", hash, "--")
_, err = git.run.Run(ctx, git.root, git.cmd, "reset", "--hard", hash, "--")
if err != nil {
return err
}
log.V(0).Info("reset root to hash", "path", git.root, "hash", hash)
git.log.V(0).Info("reset root to hash", "path", git.root, "hash", hash)
// Flip the symlink.
oldWorktree, err := git.UpdateSymlink(ctx, worktreePath)
@ -1065,29 +1023,15 @@ func (git *repoSync) AddWorktreeAndSwap(ctx context.Context, hash string) error
// From here on we have to save errors until the end.
// Execute the hook command, if requested.
var execErr error
if git.syncHookCmd != "" {
log.V(0).Info("executing command for git sync hooks", "command", git.syncHookCmd)
// TODO: move this to be async like webhook?
if _, err := runCommand(ctx, worktreePath, git.syncHookCmd); err != nil {
// Save it until after cleanup runs.
execErr = err
}
}
// Clean up previous worktrees.
var cleanupErr error
if oldWorktree != "" {
cleanupErr = cleanupWorkTree(ctx, git.root, oldWorktree)
cleanupErr = git.CleanupWorkTree(ctx, git.root, oldWorktree)
}
if cleanupErr != nil {
return cleanupErr
}
if execErr != nil {
return execErr
}
return nil
}
@ -1098,18 +1042,18 @@ func (git *repoSync) CloneRepo(ctx context.Context) error {
args = append(args, "--depth", strconv.Itoa(git.depth))
}
args = append(args, git.repo, git.root)
log.V(0).Info("cloning repo", "origin", git.repo, "path", git.root)
git.log.V(0).Info("cloning repo", "origin", git.repo, "path", git.root)
_, err := runCommand(ctx, "", git.cmd, args...)
_, err := git.run.Run(ctx, "", git.cmd, args...)
if err != nil {
if strings.Contains(err.Error(), "already exists and is not an empty directory") {
// Maybe a previous run crashed? Git won't use this dir.
log.V(0).Info("git root exists and is not empty (previous crash?), cleaning up", "path", git.root)
git.log.V(0).Info("git root exists and is not empty (previous crash?), cleaning up", "path", git.root)
err := os.RemoveAll(git.root)
if err != nil {
return err
}
_, err = runCommand(ctx, "", git.cmd, args...)
_, err = git.run.Run(ctx, "", git.cmd, args...)
if err != nil {
return err
}
@ -1120,7 +1064,7 @@ func (git *repoSync) CloneRepo(ctx context.Context) error {
// If sparse checkout is requested, configure git for it.
if git.sparseFile != "" {
log.V(0).Info("configuring sparse checkout")
git.log.V(0).Info("configuring sparse checkout")
checkoutFile := git.sparseFile
// TODO: capture this as a function (mostly duplicated above)
@ -1154,7 +1098,7 @@ func (git *repoSync) CloneRepo(ctx context.Context) error {
}
args := []string{"sparse-checkout", "init"}
_, err = runCommand(ctx, git.root, git.cmd, args...)
_, err = git.run.Run(ctx, git.root, git.cmd, args...)
if err != nil {
return err
}
@ -1165,7 +1109,7 @@ func (git *repoSync) CloneRepo(ctx context.Context) error {
// LocalHashForRev returns the locally known hash for a given rev.
func (git *repoSync) LocalHashForRev(ctx context.Context, rev string) (string, error) {
output, err := runCommand(ctx, git.root, git.cmd, "rev-parse", rev)
output, err := git.run.Run(ctx, git.root, git.cmd, "rev-parse", rev)
if err != nil {
return "", err
}
@ -1174,7 +1118,7 @@ func (git *repoSync) LocalHashForRev(ctx context.Context, rev string) (string, e
// RemoteHashForRef returns the upstream hash for a given ref.
func (git *repoSync) RemoteHashForRef(ctx context.Context, ref string) (string, error) {
output, err := runCommand(ctx, git.root, git.cmd, "ls-remote", "-q", "origin", ref)
output, err := git.run.Run(ctx, git.root, git.cmd, "ls-remote", "-q", "origin", ref)
if err != nil {
return "", err
}
@ -1192,7 +1136,7 @@ func (git *repoSync) RevIsHash(ctx context.Context) (bool, error) {
func (git *repoSync) ResolveRef(ctx context.Context, ref string) (string, error) {
// If git doesn't identify rev as a commit, we're done.
output, err := runCommand(ctx, git.root, git.cmd, "cat-file", "-t", ref)
output, err := git.run.Run(ctx, git.root, git.cmd, "cat-file", "-t", ref)
if err != nil {
return "", err
}
@ -1249,10 +1193,10 @@ func (git *repoSync) SyncRepo(ctx context.Context) (bool, string, error) {
return false, "", err
}
if local == remote {
log.V(1).Info("no update required", "rev", git.rev, "local", local, "remote", remote)
git.log.V(1).Info("no update required", "rev", git.rev, "local", local, "remote", remote)
return false, "", nil
}
log.V(0).Info("update required", "rev", git.rev, "local", local, "remote", remote)
git.log.V(0).Info("update required", "rev", git.rev, "local", local, "remote", remote)
hash = remote
}
@ -1284,65 +1228,18 @@ func (git *repoSync) GetRevs(ctx context.Context) (string, string, error) {
return local, remote, nil
}
func cmdForLog(command string, args ...string) string {
if strings.ContainsAny(command, " \t\n") {
command = fmt.Sprintf("%q", command)
}
// Don't modify the passed-in args.
argsCopy := make([]string, len(args))
copy(argsCopy, args)
for i := range args {
if strings.ContainsAny(args[i], " \t\n") {
argsCopy[i] = fmt.Sprintf("%q", args[i])
}
}
return command + " " + strings.Join(argsCopy, " ")
}
func runCommand(ctx context.Context, cwd, command string, args ...string) (string, error) {
return runCommandWithStdin(ctx, cwd, "", command, args...)
}
func runCommandWithStdin(ctx context.Context, cwd, stdin, command string, args ...string) (string, error) {
cmdStr := cmdForLog(command, args...)
log.V(5).Info("running command", "cwd", cwd, "cmd", cmdStr)
cmd := exec.CommandContext(ctx, command, args...)
if cwd != "" {
cmd.Dir = cwd
}
outbuf := bytes.NewBuffer(nil)
errbuf := bytes.NewBuffer(nil)
cmd.Stdout = outbuf
cmd.Stderr = errbuf
cmd.Stdin = bytes.NewBufferString(stdin)
err := cmd.Run()
stdout := outbuf.String()
stderr := errbuf.String()
if ctx.Err() == context.DeadlineExceeded {
return "", fmt.Errorf("Run(%s): %w: { stdout: %q, stderr: %q }", cmdStr, ctx.Err(), stdout, stderr)
}
if err != nil {
return "", fmt.Errorf("Run(%s): %w: { stdout: %q, stderr: %q }", cmdStr, err, stdout, stderr)
}
log.V(6).Info("command result", "stdout", stdout, "stderr", stderr)
return stdout, nil
}
// SetupAuth configures the local git repo to use a username and password when
// accessing the repo.
func (git *repoSync) SetupAuth(ctx context.Context, username, password string) error {
log.V(1).Info("setting up git credential store")
git.log.V(1).Info("setting up git credential store")
_, err := runCommand(ctx, "", git.cmd, "config", "--global", "credential.helper", "store")
_, err := git.run.Run(ctx, "", git.cmd, "config", "--global", "credential.helper", "store")
if err != nil {
return fmt.Errorf("can't configure git credential helper: %w", err)
}
creds := fmt.Sprintf("url=%v\nusername=%v\npassword=%v\n", git.repo, username, password)
_, err = runCommandWithStdin(ctx, "", creds, git.cmd, "credential", "approve")
_, err = git.run.RunWithStdin(ctx, "", creds, git.cmd, "credential", "approve")
if err != nil {
return fmt.Errorf("can't configure git credentials: %w", err)
}
@ -1350,8 +1247,8 @@ func (git *repoSync) SetupAuth(ctx context.Context, username, password string) e
return nil
}
func setupGitSSH(setupKnownHosts bool, pathToSSHSecret, pathToSSHKnownHosts string) error {
log.V(1).Info("setting up git SSH credentials")
func (git *repoSync) SetupGitSSH(setupKnownHosts bool, pathToSSHSecret, pathToSSHKnownHosts string) error {
git.log.V(1).Info("setting up git SSH credentials")
_, err := os.Stat(pathToSSHSecret)
if err != nil {
@ -1377,7 +1274,7 @@ func setupGitSSH(setupKnownHosts bool, pathToSSHSecret, pathToSSHKnownHosts stri
}
func (git *repoSync) SetupCookieFile(ctx context.Context) error {
log.V(1).Info("configuring git cookie file")
git.log.V(1).Info("configuring git cookie file")
var pathToCookieFile = "/etc/git-secret/cookie_file"
@ -1386,7 +1283,7 @@ func (git *repoSync) SetupCookieFile(ctx context.Context) error {
return fmt.Errorf("can't access git cookiefile: %w", err)
}
if _, err = runCommand(ctx, "", git.cmd, "config", "--global", "http.cookiefile", pathToCookieFile); err != nil {
if _, err = git.run.Run(ctx, "", git.cmd, "config", "--global", "http.cookiefile", pathToCookieFile); err != nil {
return fmt.Errorf("can't configure git cookiefile: %w", err)
}
@ -1401,7 +1298,7 @@ func (git *repoSync) SetupCookieFile(ctx context.Context) error {
// username=xxx@example.com
// password=xxxyyyzzz
func (git *repoSync) CallAskPassURL(ctx context.Context) error {
log.V(1).Info("calling GIT_ASKPASS URL to get credentials")
git.log.V(1).Info("calling GIT_ASKPASS URL to get credentials")
var netClient = &http.Client{
Timeout: time.Second * 1,
@ -1455,14 +1352,14 @@ func (git *repoSync) CallAskPassURL(ctx context.Context) error {
}
func (git *repoSync) setupExtraGitConfigs(ctx context.Context, configsFlag string) error {
log.V(1).Info("setting additional git configs")
git.log.V(1).Info("setting additional git configs")
configs, err := parseGitConfigs(configsFlag)
if err != nil {
return fmt.Errorf("can't parse --git-config flag: %v", err)
}
for _, kv := range configs {
if _, err := runCommand(ctx, "", git.cmd, "config", "--global", kv.key, kv.val); err != nil {
if _, err := git.run.Run(ctx, "", git.cmd, "config", "--global", kv.key, kv.val); err != nil {
return fmt.Errorf("error configuring additional git configs %q %q: %v", kv.key, kv.val, err)
}
}
@ -1683,6 +1580,22 @@ OPTIONS
with a period. (default: "", which means error reporting will be
disabled)
--exechook-backoff <duration>, $GIT_SYNC_EXECHOOK_BACKOFF
The time to wait before retrying a failed --exechook-command.
(default: 3s)
--exechook-command <string>, $GIT_SYNC_EXECHOOK_COMMAND
An optional command to be executed after syncing a new hash of the
remote repository. This command does not take any arguments and
executes with the synced repo as its working directory. The
execution is subject to the overall --sync-timeout flag and will
extend the effective period between sync attempts. This flag
obsoletes --sync-hook-command, but if sync-hook-command is
specified, it will take precedence.
--exechook-timeout <duration>, $GIT_SYNC_EXECHOOK_TIMEOUT
The timeout for the --exechook-command. (default: 30s)
--git <string>, $GIT_SYNC_GIT
The git command to run (subject to PATH search, mostly for testing).
(default: git)
@ -1779,13 +1692,6 @@ OPTIONS
The git submodule behavior: one of 'recursive', 'shallow', or 'off'.
(default: recursive)
--sync-hook-command <string>, $GIT_SYNC_HOOK_COMMAND
An optional command to be executed after syncing a new hash of the
remote repository. This command does not take any arguments and
executes with the synced repo as its working directory. The
execution is subject to the overall --sync-timeout flag and will
extend the effective period between sync attempts.
--sync-timeout <duration>, $GIT_SYNC_SYNC_TIMEOUT
The total time allowed for one complete sync. This must be at least
10ms. This flag obsoletes --timeout, but if --timeout is specified,
@ -1803,7 +1709,7 @@ OPTIONS
Print the version and exit.
--webhook-backoff <duration>, $GIT_SYNC_WEBHOOK_BACKOFF
The time to wait before retrying a failed --webhook-url).
The time to wait before retrying a failed --webhook-url.
(default: 3s)
--webhook-method <string>, $GIT_SYNC_WEBHOOK_METHOD

View File

@ -326,7 +326,7 @@ func TestRemoveDirContents(t *testing.T) {
}
// Test removal.
if err := removeDirContents(root); err != nil {
if err := removeDirContents(root, nil); err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -352,12 +352,12 @@ func TestRemoveDirContents(t *testing.T) {
}
// Test removal.
if err := removeDirContents(root); err != nil {
if err := removeDirContents(root, nil); err != nil {
t.Errorf("unexpected error: %v", err)
}
// Test error path.
if err := removeDirContents(filepath.Join(root, "does-not-exist")); err == nil {
if err := removeDirContents(filepath.Join(root, "does-not-exist"), nil); err == nil {
t.Errorf("unexpected success for non-existent dir")
}
}

View File

@ -1,140 +0,0 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"context"
"fmt"
"net/http"
"sync"
"time"
)
// WebHook structure
type Webhook struct {
// URL for the http/s request
URL string
// Method for the http/s request
Method string
// Code to look for when determining if the request was successful.
// If this is not specified, request is sent and forgotten about.
Success int
// Timeout for the http/s request
Timeout time.Duration
// Backoff for failed webhook calls
Backoff time.Duration
// Holds the data as it crosses from producer to consumer.
Data *webhookData
}
type webhookData struct {
ch chan struct{}
mutex sync.Mutex
hash string
}
func NewWebhookData() *webhookData {
return &webhookData{
ch: make(chan struct{}, 1),
}
}
func (d *webhookData) events() chan struct{} {
return d.ch
}
func (d *webhookData) get() string {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.hash
}
func (d *webhookData) set(newHash string) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.hash = newHash
}
func (d *webhookData) send(newHash string) {
d.set(newHash)
// Non-blocking write. If the channel is full, the consumer will see the
// newest value. If the channel was not full, the consumer will get another
// event.
select {
case d.ch <- struct{}{}:
default:
}
}
func (w *Webhook) Send(hash string) {
w.Data.send(hash)
}
func (w *Webhook) Do(hash string) error {
req, err := http.NewRequest(w.Method, w.URL, nil)
if err != nil {
return err
}
req.Header.Set("Gitsync-Hash", hash)
ctx, cancel := context.WithTimeout(context.Background(), w.Timeout)
defer cancel()
req = req.WithContext(ctx)
log.V(0).Info("sending webhook", "hash", hash, "url", w.URL, "method", w.Method, "timeout", w.Timeout.String())
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
resp.Body.Close()
// If the webhook has a success statusCode, check against it
if w.Success != -1 && resp.StatusCode != w.Success {
return fmt.Errorf("received response code %d expected %d", resp.StatusCode, w.Success)
}
return nil
}
// Wait for trigger events from the channel, and send webhooks when triggered
func (w *Webhook) run() {
var lastHash string
// Wait for trigger from webhookData.Send
for range w.Data.events() {
// Retry in case of error
for {
// Always get the latest value, in case we fail-and-retry and the
// value changed in the meantime. This means that we might not send
// every single hash.
hash := w.Data.get()
if hash == lastHash {
break
}
if err := w.Do(hash); err != nil {
log.Error(err, "webhook failed", "url", w.URL, "method", w.Method, "timeout", w.Timeout.String())
time.Sleep(w.Backoff)
} else {
lastHash = hash
break
}
}
}
}

86
pkg/cmd/cmd.go Normal file
View File

@ -0,0 +1,86 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd
import (
"bytes"
"context"
"fmt"
"os/exec"
"strings"
"k8s.io/git-sync/pkg/logging"
)
// Runner is an API to run commands and log them in a consistent way.
type Runner struct {
// Logger
logger *logging.Logger
}
// NewRunner returns a new CommandRunner
func NewRunner(logger *logging.Logger) *Runner {
return &Runner{logger: logger}
}
// Run runs given command
func (r *Runner) Run(ctx context.Context, cwd, command string, args ...string) (string, error) {
return r.RunWithStdin(ctx, cwd, "", command, args...)
}
// RunWithStdin runs given command with stardart input
func (r *Runner) RunWithStdin(ctx context.Context, cwd, stdin, command string, args ...string) (string, error) {
cmdStr := cmdForLog(command, args...)
r.logger.V(5).Info("running command", "cwd", cwd, "cmd", cmdStr)
cmd := exec.CommandContext(ctx, command, args...)
if cwd != "" {
cmd.Dir = cwd
}
outbuf := bytes.NewBuffer(nil)
errbuf := bytes.NewBuffer(nil)
cmd.Stdout = outbuf
cmd.Stderr = errbuf
cmd.Stdin = bytes.NewBufferString(stdin)
err := cmd.Run()
stdout := outbuf.String()
stderr := errbuf.String()
if ctx.Err() == context.DeadlineExceeded {
return "", fmt.Errorf("Run(%s): %w: { stdout: %q, stderr: %q }", cmdStr, ctx.Err(), stdout, stderr)
}
if err != nil {
return "", fmt.Errorf("Run(%s): %w: { stdout: %q, stderr: %q }", cmdStr, err, stdout, stderr)
}
r.logger.V(6).Info("command result", "stdout", stdout, "stderr", stderr)
return stdout, nil
}
func cmdForLog(command string, args ...string) string {
if strings.ContainsAny(command, " \t\n") {
command = fmt.Sprintf("%q", command)
}
argsCopy := make([]string, len(args))
copy(argsCopy, args)
for i := range args {
if strings.ContainsAny(args[i], " \t\n") {
argsCopy[i] = fmt.Sprintf("%q", args[i])
}
}
return command + " " + strings.Join(argsCopy, " ")
}

71
pkg/hook/exechook.go Normal file
View File

@ -0,0 +1,71 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"context"
"path/filepath"
"time"
"k8s.io/git-sync/pkg/cmd"
"k8s.io/git-sync/pkg/logging"
)
// Exechook structure, implements Hook
type Exechook struct {
// Runner
cmdrunner *cmd.Runner
// Command to run
command string
// Command args
args []string
// Git root path
gitRoot string
// Timeout for the command
timeout time.Duration
// Logger
logger *logging.Logger
}
// NewExechook returns a new Exechook
func NewExechook(cmdrunner *cmd.Runner, command, gitroot string, args []string, timeout time.Duration, logger *logging.Logger) *Exechook {
return &Exechook{
cmdrunner: cmdrunner,
command: command,
gitRoot: gitroot,
args: args,
timeout: timeout,
logger: logger,
}
}
// Name describes hook, implements Hook.Name
func (h *Exechook) Name() string {
return "exechook"
}
// Do runs exechook.command, implements Hook.Do
func (h *Exechook) Do(ctx context.Context, hash string) error {
ctx, cancel := context.WithTimeout(ctx, h.timeout)
defer cancel()
worktreePath := filepath.Join(h.gitRoot, hash)
h.logger.V(0).Info("running exechook", "command", h.command, "timeout", h.timeout)
_, err := h.cmdrunner.Run(ctx, worktreePath, h.command, h.args...)
return err
}

80
pkg/hook/exechook_test.go Normal file
View File

@ -0,0 +1,80 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"context"
"testing"
"time"
"k8s.io/git-sync/pkg/cmd"
"k8s.io/git-sync/pkg/logging"
)
func TestNotZeroReturnExechookDo(t *testing.T) {
t.Run("test not zero return code", func(t *testing.T) {
l := logging.New("", "")
ch := NewExechook(
cmd.NewRunner(l),
"false",
"/tmp",
[]string{},
time.Second,
l,
)
err := ch.Do(context.Background(), "")
if err == nil {
t.Fatalf("expected error but got none")
}
})
}
func TestZeroReturnExechookDo(t *testing.T) {
t.Run("test zero return code", func(t *testing.T) {
l := logging.New("", "")
ch := NewExechook(
cmd.NewRunner(l),
"true",
"/tmp",
[]string{},
time.Second,
l,
)
err := ch.Do(context.Background(), "")
if err != nil {
t.Fatalf("expected nil but got err")
}
})
}
func TestTimeoutExechookDo(t *testing.T) {
t.Run("test timeout", func(t *testing.T) {
l := logging.New("", "")
ch := NewExechook(
cmd.NewRunner(l),
"/bin/sh",
"/tmp",
[]string{"-c", "sleep 2"},
time.Second,
l,
)
err := ch.Do(context.Background(), "")
if err == nil {
t.Fatalf("expected err but got nil")
}
})
}

138
pkg/hook/hook.go Normal file
View File

@ -0,0 +1,138 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"context"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/git-sync/pkg/logging"
)
var (
hookRunCount = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "git_sync_hook_run_count_total",
Help: "How many hook runs completed, partitioned by name and state (success, error)",
}, []string{"name", "status"})
)
// Describes what a Hook needs to implement, run by HookRunner
type Hook interface {
// Describes hook
Name() string
// Function that called by HookRunner
Do(ctx context.Context, hash string) error
}
type hookData struct {
ch chan struct{}
mutex sync.Mutex
hash string
}
// NewHookData returns a new HookData
func NewHookData() *hookData {
return &hookData{
ch: make(chan struct{}, 1),
}
}
func (d *hookData) events() chan struct{} {
return d.ch
}
func (d *hookData) get() string {
d.mutex.Lock()
defer d.mutex.Unlock()
return d.hash
}
func (d *hookData) set(newHash string) {
d.mutex.Lock()
defer d.mutex.Unlock()
d.hash = newHash
}
func (d *hookData) send(newHash string) {
d.set(newHash)
// Non-blocking write. If the channel is full, the consumer will see the
// newest value. If the channel was not full, the consumer will get another
// event.
select {
case d.ch <- struct{}{}:
default:
}
}
// NewHookRunner returns a new HookRunner
func NewHookRunner(hook Hook, backoff time.Duration, data *hookData, log *logging.Logger) *HookRunner {
return &HookRunner{hook: hook, backoff: backoff, data: data, logger: log}
}
// HookRunner struct
type HookRunner struct {
// Hook to run and check
hook Hook
// Backoff for failed hooks
backoff time.Duration
// Holds the data as it crosses from producer to consumer.
data *hookData
// Logger
logger *logging.Logger
}
// Send sends hash to hookdata
func (r *HookRunner) Send(hash string) {
r.data.send(hash)
}
// Run waits for trigger events from the channel, and run hook when triggered
func (r *HookRunner) Run(ctx context.Context) {
var lastHash string
prometheus.MustRegister(hookRunCount)
// Wait for trigger from hookData.Send
for range r.data.events() {
// Retry in case of error
for {
// Always get the latest value, in case we fail-and-retry and the
// value changed in the meantime. This means that we might not send
// every single hash.
hash := r.data.get()
if hash == lastHash {
break
}
if err := r.hook.Do(ctx, hash); err != nil {
r.logger.Error(err, "hook failed")
updateHookRunCountMetric(r.hook.Name(), "error")
time.Sleep(r.backoff)
} else {
updateHookRunCountMetric(r.hook.Name(), "success")
lastHash = hash
break
}
}
}
}
func updateHookRunCountMetric(name, status string) {
hookRunCount.WithLabelValues(name, status).Inc()
}

View File

@ -14,12 +14,11 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package main
package hook
import (
"fmt"
"testing"
"time"
)
const (
@ -27,72 +26,55 @@ const (
hash2 = "2222222222222222222222222222222222222222"
)
func TestWebhookData(t *testing.T) {
t.Run("webhook consumes first hash value", func(t *testing.T) {
whd := NewWebhookData()
func TestHookData(t *testing.T) {
t.Run("hook consumes first hash value", func(t *testing.T) {
hd := NewHookData()
whd.send(hash1)
hd.send(hash1)
<-whd.events()
<-hd.events()
hash := whd.get()
hash := hd.get()
if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash)
}
})
t.Run("last update wins when channel buffer is full", func(t *testing.T) {
whd := NewWebhookData()
hd := NewHookData()
for i := 0; i < 10; i++ {
h := fmt.Sprintf("111111111111111111111111111111111111111%d", i)
whd.send(h)
hd.send(h)
}
whd.send(hash2)
hd.send(hash2)
<-whd.events()
<-hd.events()
hash := whd.get()
hash := hd.get()
if hash2 != hash {
t.Fatalf("expected hash %s but got %s", hash2, hash)
}
})
t.Run("same hash value", func(t *testing.T) {
whd := NewWebhookData()
events := whd.events()
hd := NewHookData()
events := hd.events()
whd.send(hash1)
hd.send(hash1)
<-events
hash := whd.get()
hash := hd.get()
if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash)
}
whd.send(hash1)
hd.send(hash1)
<-events
hash = whd.get()
hash = hd.get()
if hash1 != hash {
t.Fatalf("expected hash %s but got %s", hash1, hash)
}
})
}
func TestDo(t *testing.T) {
t.Run("test invalid urls are handled", func(t *testing.T) {
wh := Webhook{
URL: ":http://localhost:601426/hooks/webhook",
Method: "POST",
Success: 200,
Timeout: time.Second,
Backoff: time.Second * 3,
Data: NewWebhookData(),
}
err := wh.Do("hash")
if err == nil {
t.Fatalf("expected error for invalid url but got none")
}
})
}

84
pkg/hook/webhook.go Normal file
View File

@ -0,0 +1,84 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"context"
"fmt"
"net/http"
"time"
"k8s.io/git-sync/pkg/logging"
)
// WebHook structure, implements Hook
type Webhook struct {
// Url for the http/s request
url string
// Method for the http/s request
method string
// Code to look for when determining if the request was successful.
// If this is not specified, request is sent and forgotten about.
success int
// Timeout for the http/s request
timeout time.Duration
// Logger
logger *logging.Logger
}
// NewWebhook returns a new WebHook
func NewWebhook(url, method string, success int, timeout time.Duration, logger *logging.Logger) *Webhook {
return &Webhook{
url: url,
method: method,
success: success,
timeout: timeout,
logger: logger,
}
}
// Name describes hook, implements Hook.Name
func (w *Webhook) Name() string {
return "webhook"
}
// Do calls webhook.url, implements Hook.Do
func (w *Webhook) Do(ctx context.Context, hash string) error {
req, err := http.NewRequest(w.method, w.url, nil)
if err != nil {
return err
}
req.Header.Set("Gitsync-Hash", hash)
ctx, cancel := context.WithTimeout(ctx, w.timeout)
defer cancel()
req = req.WithContext(ctx)
w.logger.V(0).Info("sending webhook", "hash", hash, "url", w.url, "method", w.method, "timeout", w.timeout)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
resp.Body.Close()
// If the webhook has a success statusCode, check against it
if w.success != -1 && resp.StatusCode != w.success {
return fmt.Errorf("received response code %d expected %d", resp.StatusCode, w.success)
}
return nil
}

41
pkg/hook/webhook_test.go Normal file
View File

@ -0,0 +1,41 @@
/*
Copyright 2019 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hook
import (
"context"
"testing"
"time"
"k8s.io/git-sync/pkg/logging"
)
func TestWebhookDo(t *testing.T) {
t.Run("test invalid urls are handled", func(t *testing.T) {
wh := NewWebhook(
":http://localhost:601426/hooks/webhook",
"POST",
200,
time.Second,
logging.New("", ""),
)
err := wh.Do(context.Background(), "hash")
if err == nil {
t.Fatalf("expected error for invalid url but got none")
}
})
}

132
pkg/logging/logging.go Normal file
View File

@ -0,0 +1,132 @@
/*
Copyright 2021 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logging
import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"github.com/go-logr/glogr"
"github.com/go-logr/logr"
)
// Logger provides a logging interface.
type Logger struct {
logr.Logger
root string
errorFile string
}
// New returns a Logger, with the same API as logr.Logger.
func New(root string, errorFile string) *Logger {
return &Logger{Logger: glogr.New(), root: root, errorFile: errorFile}
}
// Error implements logr.Logger.Error.
func (l *Logger) Error(err error, msg string, kvList ...interface{}) {
l.Logger.WithCallDepth(1).Error(err, msg, kvList...)
if l.errorFile == "" {
return
}
payload := struct {
Msg string
Err string
Args map[string]interface{}
}{
Msg: msg,
Err: err.Error(),
Args: map[string]interface{}{},
}
if len(kvList)%2 != 0 {
kvList = append(kvList, "<no-value>")
}
for i := 0; i < len(kvList); i += 2 {
k, ok := kvList[i].(string)
if !ok {
k = fmt.Sprintf("%v", kvList[i])
}
payload.Args[k] = kvList[i+1]
}
jb, err := json.Marshal(payload)
if err != nil {
l.Logger.Error(err, "can't encode error payload")
content := fmt.Sprintf("%v", err)
l.writeContent([]byte(content))
} else {
l.writeContent(jb)
}
}
// ExportError exports the error to the error file if --export-error is enabled.
func (l *Logger) ExportError(content string) {
if l.errorFile == "" {
return
}
l.writeContent([]byte(content))
}
// DeleteErrorFile deletes the error file.
func (l *Logger) DeleteErrorFile() {
if l.errorFile == "" {
return
}
errorFile := filepath.Join(l.root, l.errorFile)
if err := os.Remove(errorFile); err != nil {
if os.IsNotExist(err) {
return
}
l.Logger.Error(err, "can't delete the error-file", "filename", errorFile)
}
}
// writeContent writes the error content to the error file.
func (l *Logger) writeContent(content []byte) {
if _, err := os.Stat(l.root); os.IsNotExist(err) {
fileMode := os.FileMode(0755)
if err := os.Mkdir(l.root, fileMode); err != nil {
l.Logger.Error(err, "can't create the root directory", "root", l.root)
return
}
}
tmpFile, err := ioutil.TempFile(l.root, "tmp-err-")
if err != nil {
l.Logger.Error(err, "can't create temporary error-file", "directory", l.root, "prefix", "tmp-err-")
return
}
defer func() {
if err := tmpFile.Close(); err != nil {
l.Logger.Error(err, "can't close temporary error-file", "filename", tmpFile.Name())
}
}()
if _, err = tmpFile.Write(content); err != nil {
l.Logger.Error(err, "can't write to temporary error-file", "filename", tmpFile.Name())
return
}
errorFile := filepath.Join(l.root, l.errorFile)
if err := os.Rename(tmpFile.Name(), errorFile); err != nil {
l.Logger.Error(err, "can't rename to error-file", "temp-file", tmpFile.Name(), "error-file", errorFile)
return
}
if err := os.Chmod(errorFile, 0644); err != nil {
l.Logger.Error(err, "can't change permissions on the error-file", "error-file", errorFile)
}
}

View File

@ -164,7 +164,11 @@ trap finish INT EXIT
SLOW_GIT_CLONE=/slow_git_clone.sh
SLOW_GIT_FETCH=/slow_git_fetch.sh
ASKPASS_GIT=/askpass_git.sh
SYNC_HOOK_COMMAND=/test_sync_hook_command.sh
EXECHOOK_COMMAND=/test_exechook_command.sh
EXECHOOK_COMMAND_FAIL=/test_exechook_command_fail.sh
RUNLOG="$DIR/runlog.exechook-fail-retry"
rm -f $RUNLOG
touch $RUNLOG
function GIT_SYNC() {
#./bin/linux_amd64/git-sync "$@"
@ -178,7 +182,9 @@ function GIT_SYNC() {
-v "$(pwd)/slow_git_clone.sh":"$SLOW_GIT_CLONE":ro \
-v "$(pwd)/slow_git_fetch.sh":"$SLOW_GIT_FETCH":ro \
-v "$(pwd)/askpass_git.sh":"$ASKPASS_GIT":ro \
-v "$(pwd)/test_sync_hook_command.sh":"$SYNC_HOOK_COMMAND":ro \
-v "$(pwd)/test_exechook_command.sh":"$EXECHOOK_COMMAND":ro \
-v "$(pwd)/test_exechook_command_fail.sh":"$EXECHOOK_COMMAND_FAIL":ro \
-v "$RUNLOG":/var/log/runs \
-v "$DOT_SSH/id_test":"/etc/git-secret/ssh":ro \
--env XDG_CONFIG_HOME=$DIR \
e2e/git-sync:$(make -s version)__$(go env GOOS)_$(go env GOARCH) \
@ -949,9 +955,9 @@ assert_file_eq "$ROOT"/link/file "$TESTCASE 1"
pass
##############################################
# Test sync_hook_command
# Test exechook-success
##############################################
testcase "sync_hook_command"
testcase "exechook-success"
# First sync
echo "$TESTCASE 1" > "$REPO"/file
git -C "$REPO" commit -qam "$TESTCASE 1"
@ -961,30 +967,55 @@ GIT_SYNC \
--branch=e2e-branch \
--root="$ROOT" \
--link="link" \
--sync-hook-command="$SYNC_HOOK_COMMAND" \
--exechook-command="$EXECHOOK_COMMAND" \
> "$DIR"/log."$TESTCASE" 2>&1 &
sleep 3
assert_link_exists "$ROOT"/link
assert_file_exists "$ROOT"/link/file
assert_file_exists "$ROOT"/link/sync-hook
assert_file_exists "$ROOT"/link/link-sync-hook
assert_file_exists "$ROOT"/link/exechook
assert_file_exists "$ROOT"/link/link-exechook
assert_file_eq "$ROOT"/link/file "$TESTCASE 1"
assert_file_eq "$ROOT"/link/sync-hook "$TESTCASE 1"
assert_file_eq "$ROOT"/link/link-sync-hook "$TESTCASE 1"
assert_file_eq "$ROOT"/link/exechook "$TESTCASE 1"
assert_file_eq "$ROOT"/link/link-exechook "$TESTCASE 1"
# Move forward
echo "$TESTCASE 2" > "$REPO"/file
git -C "$REPO" commit -qam "$TESTCASE 2"
sleep 3
assert_link_exists "$ROOT"/link
assert_file_exists "$ROOT"/link/file
assert_file_exists "$ROOT"/link/sync-hook
assert_file_exists "$ROOT"/link/link-sync-hook
assert_file_exists "$ROOT"/link/exechook
assert_file_exists "$ROOT"/link/link-exechook
assert_file_eq "$ROOT"/link/file "$TESTCASE 2"
assert_file_eq "$ROOT"/link/sync-hook "$TESTCASE 2"
assert_file_eq "$ROOT"/link/link-sync-hook "$TESTCASE 2"
assert_file_eq "$ROOT"/link/exechook "$TESTCASE 2"
assert_file_eq "$ROOT"/link/link-exechook "$TESTCASE 2"
# Wrap up
pass
##############################################
# Test exechook-fail-retry
##############################################
testcase "exechook-fail-retry"
cat /dev/null > "$RUNLOG"
# First sync - return a failure to ensure that we try again
echo "$TESTCASE 1" > "$REPO"/file
git -C "$REPO" commit -qam "$TESTCASE 1"
GIT_SYNC \
--period=100ms \
--repo="file://$REPO" \
--branch=e2e-branch \
--root="$ROOT" \
--link="link" \
--exechook-command="$EXECHOOK_COMMAND_FAIL" \
--exechook-backoff=1s \
> "$DIR"/log."$TESTCASE" 2>&1 &
# Check that exechook was called
sleep 5
RUNS=$(cat "$RUNLOG" | wc -l)
if [ "$RUNS" -lt 2 ]; then
fail "exechook called $RUNS times, it should be at least 2"
fi
pass
##############################################
# Test webhook success
##############################################

View File

@ -14,8 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# Use for e2e test of --sync-hook-command.
# Use for e2e test of --exechook-command.
# This option takes no command arguments, so requires a wrapper script.
cat file > sync-hook
cat ../link/file > link-sync-hook
cat file > exechook
cat ../link/file > link-exechook

21
test_exechook_command_fail.sh Executable file
View File

@ -0,0 +1,21 @@
#!/bin/sh
#
# Copyright 2021 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Use for e2e test of --exechook-command.
# This option takes no command arguments, so requires a wrapper script.
date >> /var/log/runs
exit 1