linkerd2/controller/cmd/service-mirror/main.go

93 lines
2.5 KiB
Go

package servicemirror
import (
"context"
"errors"
"flag"
"os"
"os/signal"
"syscall"
"time"
"k8s.io/client-go/informers"
"k8s.io/client-go/tools/cache"
"github.com/linkerd/linkerd2/controller/k8s"
"github.com/linkerd/linkerd2/pkg/admin"
"github.com/linkerd/linkerd2/pkg/flags"
log "github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)
type chanProbeEventSink struct{ sender func(event interface{}) }
func (s *chanProbeEventSink) send(event interface{}) {
s.sender(event)
}
func initLocalSecretsInformer(api kubernetes.Interface, namespace string) (cache.SharedIndexInformer, error) {
sharedInformers := informers.NewSharedInformerFactoryWithOptions(api, 10*time.Minute, informers.WithNamespace(namespace))
informer := sharedInformers.Core().V1().Secrets().Informer()
sharedInformers.Start(nil)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
log.Infof("waiting for local namespaced secrets informer caches to sync")
if !cache.WaitForCacheSync(ctx.Done(), informer.HasSynced) {
return nil, errors.New("failed to sync local namespaced secrets informer caches")
}
log.Infof("local namespaced secrets informer caches synced")
return informer, nil
}
// Main executes the tap service-mirror
func Main(args []string) {
cmd := flag.NewFlagSet("service-mirror", flag.ExitOnError)
kubeConfigPath := cmd.String("kubeconfig", "", "path to the local kube config")
requeueLimit := cmd.Int("event-requeue-limit", 3, "requeue limit for events")
metricsAddr := cmd.String("metrics-addr", ":9999", "address to serve scrapable metrics on")
namespace := cmd.String("namespace", "", "address to serve scrapable metrics on")
flags.ConfigureAndParse(cmd, args)
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
k8sAPI, err := k8s.InitializeAPI(
*kubeConfigPath,
false,
k8s.Svc,
k8s.NS,
k8s.Endpoint,
)
//TODO: Use can-i to check for required permissions
if err != nil {
log.Fatalf("Failed to initialize K8s API: %s", err)
}
secretsInformer, err := initLocalSecretsInformer(k8sAPI.Client, *namespace)
if err != nil {
log.Fatalf("Failed to initialize secrets informer: %s", err)
}
probeManager := NewProbeManager(k8sAPI)
probeManager.Start()
k8sAPI.Sync(nil)
watcher := NewRemoteClusterConfigWatcher(*namespace, secretsInformer, k8sAPI, *requeueLimit, &chanProbeEventSink{probeManager.enqueueEvent})
log.Info("Started cluster config watcher")
go admin.StartServer(*metricsAddr)
<-stop
log.Info("Stopping cluster config watcher")
watcher.Stop()
probeManager.Stop()
}