240 lines
7.3 KiB
Go
240 lines
7.3 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/base64"
|
|
"fmt"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"strings"
|
|
"text/tabwriter"
|
|
"time"
|
|
|
|
"github.com/argoproj/gitops-engine/pkg/utils/text"
|
|
|
|
"github.com/go-logr/logr"
|
|
"github.com/spf13/cobra"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/client-go/tools/clientcmd"
|
|
"k8s.io/klog/v2/textlogger"
|
|
|
|
"github.com/argoproj/gitops-engine/pkg/cache"
|
|
"github.com/argoproj/gitops-engine/pkg/engine"
|
|
"github.com/argoproj/gitops-engine/pkg/sync"
|
|
"github.com/argoproj/gitops-engine/pkg/utils/kube"
|
|
|
|
_ "net/http/pprof"
|
|
)
|
|
|
|
const (
|
|
annotationGCMark = "gitops-agent.argoproj.io/gc-mark"
|
|
envProfile = "GITOPS_ENGINE_PROFILE"
|
|
envProfileHost = "GITOPS_ENGINE_PROFILE_HOST"
|
|
envProfilePort = "GITOPS_ENGINE_PROFILE_PORT"
|
|
)
|
|
|
|
func main() {
|
|
log := textlogger.NewLogger(textlogger.NewConfig())
|
|
err := newCmd(log).Execute()
|
|
checkError(err, log)
|
|
}
|
|
|
|
type resourceInfo struct {
|
|
gcMark string
|
|
}
|
|
|
|
type settings struct {
|
|
repoPath string
|
|
paths []string
|
|
}
|
|
|
|
func (s *settings) getGCMark(key kube.ResourceKey) string {
|
|
h := sha256.New()
|
|
_, _ = fmt.Fprintf(h, "%s/%s", s.repoPath, strings.Join(s.paths, ","))
|
|
_, _ = h.Write([]byte(strings.Join([]string{key.Group, key.Kind, key.Name}, "/")))
|
|
return "sha256." + base64.RawURLEncoding.EncodeToString(h.Sum(nil))
|
|
}
|
|
|
|
func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error) {
|
|
cmd := exec.Command("git", "rev-parse", "HEAD")
|
|
cmd.Dir = s.repoPath
|
|
revision, err := cmd.CombinedOutput()
|
|
if err != nil {
|
|
return nil, "", fmt.Errorf("failed to determine git revision: %w", err)
|
|
}
|
|
var res []*unstructured.Unstructured
|
|
for i := range s.paths {
|
|
if err := filepath.Walk(filepath.Join(s.repoPath, s.paths[i]), func(path string, info os.FileInfo, err error) error {
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if info.IsDir() {
|
|
return nil
|
|
}
|
|
if ext := strings.ToLower(filepath.Ext(info.Name())); ext != ".json" && ext != ".yml" && ext != ".yaml" {
|
|
return nil
|
|
}
|
|
data, err := os.ReadFile(path)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to read file %s: %w", path, err)
|
|
}
|
|
items, err := kube.SplitYAML(data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse %s: %w", path, err)
|
|
}
|
|
res = append(res, items...)
|
|
return nil
|
|
}); err != nil {
|
|
return nil, "", fmt.Errorf("failed to parse %s: %w", s.paths[i], err)
|
|
}
|
|
}
|
|
for i := range res {
|
|
annotations := res[i].GetAnnotations()
|
|
if annotations == nil {
|
|
annotations = make(map[string]string)
|
|
}
|
|
annotations[annotationGCMark] = s.getGCMark(kube.GetResourceKey(res[i]))
|
|
res[i].SetAnnotations(annotations)
|
|
}
|
|
return res, string(revision), nil
|
|
}
|
|
|
|
func StartProfiler(log logr.Logger) {
|
|
if os.Getenv(envProfile) == "web" {
|
|
go func() {
|
|
runtime.SetBlockProfileRate(1)
|
|
runtime.SetMutexProfileFraction(1)
|
|
profilePort := text.WithDefault(os.Getenv(envProfilePort), "6060")
|
|
profileHost := text.WithDefault(os.Getenv(envProfileHost), "127.0.0.1")
|
|
|
|
log.Info("pprof", "err", http.ListenAndServe(fmt.Sprintf("%s:%s", profileHost, profilePort), nil))
|
|
}()
|
|
}
|
|
}
|
|
|
|
func newCmd(log logr.Logger) *cobra.Command {
|
|
var (
|
|
clientConfig clientcmd.ClientConfig
|
|
paths []string
|
|
resyncSeconds int
|
|
port int
|
|
prune bool
|
|
namespace string
|
|
namespaced bool
|
|
)
|
|
cmd := cobra.Command{
|
|
Use: "gitops REPO_PATH",
|
|
Run: func(cmd *cobra.Command, args []string) {
|
|
if len(args) < 1 {
|
|
cmd.HelpFunc()(cmd, args)
|
|
os.Exit(1)
|
|
}
|
|
s := settings{args[0], paths}
|
|
config, err := clientConfig.ClientConfig()
|
|
checkError(err, log)
|
|
if namespace == "" {
|
|
namespace, _, err = clientConfig.Namespace()
|
|
checkError(err, log)
|
|
}
|
|
|
|
var namespaces []string
|
|
if namespaced {
|
|
namespaces = []string{namespace}
|
|
}
|
|
|
|
StartProfiler(log)
|
|
clusterCache := cache.NewClusterCache(config,
|
|
cache.SetNamespaces(namespaces),
|
|
cache.SetLogr(log),
|
|
cache.SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, _ bool) (info any, cacheManifest bool) {
|
|
// store gc mark of every resource
|
|
gcMark := un.GetAnnotations()[annotationGCMark]
|
|
info = &resourceInfo{gcMark: un.GetAnnotations()[annotationGCMark]}
|
|
// cache resources that has that mark to improve performance
|
|
cacheManifest = gcMark != ""
|
|
return
|
|
}),
|
|
)
|
|
gitOpsEngine := engine.NewEngine(config, clusterCache, engine.WithLogr(log))
|
|
checkError(err, log)
|
|
|
|
cleanup, err := gitOpsEngine.Run()
|
|
checkError(err, log)
|
|
defer cleanup()
|
|
|
|
resync := make(chan bool)
|
|
go func() {
|
|
ticker := time.NewTicker(time.Second * time.Duration(resyncSeconds))
|
|
for {
|
|
<-ticker.C
|
|
log.Info("Synchronization triggered by timer")
|
|
resync <- true
|
|
}
|
|
}()
|
|
http.HandleFunc("/api/v1/sync", func(_ http.ResponseWriter, _ *http.Request) {
|
|
log.Info("Synchronization triggered by API call")
|
|
resync <- true
|
|
})
|
|
go func() {
|
|
checkError(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", port), nil), log)
|
|
}()
|
|
|
|
for ; true; <-resync {
|
|
target, revision, err := s.parseManifests()
|
|
if err != nil {
|
|
log.Error(err, "Failed to parse target state")
|
|
continue
|
|
}
|
|
|
|
result, err := gitOpsEngine.Sync(context.Background(), target, func(r *cache.Resource) bool {
|
|
return r.Info.(*resourceInfo).gcMark == s.getGCMark(r.ResourceKey())
|
|
}, revision, namespace, sync.WithPrune(prune), sync.WithLogr(log))
|
|
if err != nil {
|
|
log.Error(err, "Failed to synchronize cluster state")
|
|
continue
|
|
}
|
|
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
|
|
_, _ = fmt.Fprintf(w, "RESOURCE\tRESULT\n")
|
|
for _, res := range result {
|
|
_, _ = fmt.Fprintf(w, "%s\t%s\n", res.ResourceKey.String(), res.Message)
|
|
}
|
|
_ = w.Flush()
|
|
}
|
|
},
|
|
}
|
|
clientConfig = addKubectlFlagsToCmd(&cmd)
|
|
cmd.Flags().StringArrayVar(&paths, "path", []string{"."}, "Directory path with-in repository")
|
|
cmd.Flags().IntVar(&resyncSeconds, "resync-seconds", 300, "Resync duration in seconds.")
|
|
cmd.Flags().IntVar(&port, "port", 9001, "Port number.")
|
|
cmd.Flags().BoolVar(&prune, "prune", true, "Enables resource pruning.")
|
|
cmd.Flags().BoolVar(&namespaced, "namespaced", false, "Switches agent into namespaced mode.")
|
|
cmd.Flags().StringVar(&namespace, "default-namespace", "",
|
|
"The namespace that should be used if resource namespace is not specified. "+
|
|
"By default resources are installed into the same namespace where gitops-agent is installed.")
|
|
return &cmd
|
|
}
|
|
|
|
// addKubectlFlagsToCmd adds kubectl like flags to a command and returns the ClientConfig interface
|
|
// for retrieving the values.
|
|
func addKubectlFlagsToCmd(cmd *cobra.Command) clientcmd.ClientConfig {
|
|
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
|
|
loadingRules.DefaultClientConfig = &clientcmd.DefaultClientConfig
|
|
overrides := clientcmd.ConfigOverrides{}
|
|
kflags := clientcmd.RecommendedConfigOverrideFlags("")
|
|
cmd.PersistentFlags().StringVar(&loadingRules.ExplicitPath, "kubeconfig", "", "Path to a kube config. Only required if out-of-cluster")
|
|
clientcmd.BindOverrideFlags(&overrides, cmd.PersistentFlags(), kflags)
|
|
return clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
|
|
}
|
|
|
|
// checkError is a convenience function to check if an error is non-nil and exit if it was
|
|
func checkError(err error, log logr.Logger) {
|
|
if err != nil {
|
|
log.Error(err, "Fatal error")
|
|
os.Exit(1)
|
|
}
|
|
}
|