linkerd2/controller/cmd/identity/main.go

223 lines
6.8 KiB
Go

package identity
import (
"context"
"flag"
"fmt"
"net"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
idctl "github.com/linkerd/linkerd2/controller/identity"
"github.com/linkerd/linkerd2/pkg/admin"
"github.com/linkerd/linkerd2/pkg/flags"
"github.com/linkerd/linkerd2/pkg/identity"
"github.com/linkerd/linkerd2/pkg/k8s"
"github.com/linkerd/linkerd2/pkg/prometheus"
"github.com/linkerd/linkerd2/pkg/tls"
"github.com/linkerd/linkerd2/pkg/trace"
log "github.com/sirupsen/logrus"
"google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes/scheme"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/record"
)
// Main executes the identity subcommand
func Main(args []string) {
cmd := flag.NewFlagSet("identity", flag.ExitOnError)
addr := cmd.String("addr", ":8080", "address to serve on")
adminAddr := cmd.String("admin-addr", ":9990", "address of HTTP admin server")
kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
controllerNS := cmd.String("controller-namespace", "", "namespace in which Linkerd is installed")
identityScheme := cmd.String("identity-scheme", "", "scheme used for the identity issuer secret format")
trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
identityIssuanceLifeTime := cmd.String("identity-issuance-lifetime", "", "the amount of time for which the Identity issuer should certify identity")
identityClockSkewAllowance := cmd.String("identity-clock-skew-allowance", "", "the amount of time to allow for clock skew within a Linkerd cluster")
enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
qps := cmd.Float64("kube-apiclient-qps", 100, "Maximum QPS sent to the kube-apiserver before throttling")
burst := cmd.Int("kube-apiclient-burst", 200, "Burst value over kube-apiclient-qps")
issuerPath := cmd.String("issuer",
"/var/run/linkerd/identity/issuer",
"path to directory containing issuer credentials")
var issuerPathCrt string
var issuerPathKey string
traceCollector := flags.AddTraceFlags(cmd)
componentName := "linkerd-identity"
flags.ConfigureAndParse(cmd, args)
ready := false
adminServer := admin.NewServer(*adminAddr, *enablePprof, &ready)
go func() {
log.Infof("starting admin server on %s", *adminAddr)
if err := adminServer.ListenAndServe(); err != nil {
log.Errorf("failed to start identity admin server: %s", err)
}
}()
identityTrustAnchorPEM, err := os.ReadFile(k8s.MountPathTrustRootsPEM)
if err != nil {
log.Fatalf("could not read identity trust anchors PEM: %s", err.Error())
}
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if *identityScheme == "" || *trustDomain == "" {
log.Infof("Identity disabled in control plane configuration.")
//nolint:gocritic
os.Exit(0)
}
if *identityScheme == k8s.IdentityIssuerSchemeLinkerd {
issuerPathCrt = filepath.Join(*issuerPath, k8s.IdentityIssuerCrtName)
issuerPathKey = filepath.Join(*issuerPath, k8s.IdentityIssuerKeyName)
} else {
issuerPathCrt = filepath.Join(*issuerPath, corev1.TLSCertKey)
issuerPathKey = filepath.Join(*issuerPath, corev1.TLSPrivateKeyKey)
}
dom, err := idctl.NewTrustDomain(*controllerNS, *trustDomain)
if err != nil {
//nolint:gocritic
log.Fatalf("Invalid trust domain: %s", err.Error())
}
trustAnchors, err := tls.DecodePEMCertPool(string(identityTrustAnchorPEM))
if err != nil {
//nolint:gocritic
log.Fatalf("Failed to read trust anchors: %s", err)
}
validity := tls.Validity{
ClockSkewAllowance: tls.DefaultClockSkewAllowance,
Lifetime: identity.DefaultIssuanceLifetime,
}
if pbd := *identityClockSkewAllowance; pbd != "" {
csa, err := time.ParseDuration(pbd)
if err != nil {
log.Warnf("Invalid clock skew allowance: %s", err)
} else {
validity.ClockSkewAllowance = csa
}
}
if pbd := *identityIssuanceLifeTime; pbd != "" {
il, err := time.ParseDuration(pbd)
if err != nil {
log.Warnf("Invalid issuance lifetime: %s", err)
} else {
validity.Lifetime = il
}
}
expectedName := fmt.Sprintf("identity.%s.%s", *controllerNS, *trustDomain)
issuerEvent := make(chan struct{})
issuerError := make(chan error)
//
// Create and start FS creds watcher
//
watcher := tls.NewFsCredsWatcher(*issuerPath, issuerEvent, issuerError)
go func() {
if err := watcher.StartWatching(ctx); err != nil {
//nolint:gocritic
log.Fatalf("Failed to start creds watcher: %s", err)
}
}()
//
// Create k8s API
//
config, err := k8s.GetConfig(*kubeConfigPath, "")
if err != nil {
log.Fatalf("Error configuring Kubernetes API client: %s", err)
}
k8sAPI, err := k8s.NewAPIForConfig(config, "", []string{}, 0, float32(*qps), *burst)
if err != nil {
log.Fatalf("Failed to load kubeconfig: %s: %s", *kubeConfigPath, err)
}
log.Infof("Using k8s client with QPS=%.2f Burst=%d", config.QPS, config.Burst)
v, err := idctl.NewK8sTokenValidator(ctx, k8sAPI, dom)
if err != nil {
log.Fatalf("Failed to initialize identity service: %s", err)
}
// Create K8s event recorder
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
Interface: k8sAPI.CoreV1().Events(""),
})
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName})
if err != nil {
log.Fatalf("Failed to construct k8s event recorder: %s", err)
}
recordEventFunc := func(parent runtime.Object, eventType, reason, message string) {
if parent == nil {
parent = &corev1.ObjectReference{
APIVersion: "apps/v1",
Kind: "Deployment",
Namespace: *controllerNS,
Name: componentName,
}
}
recorder.Event(parent, eventType, reason, message)
}
//
// Create, initialize and run service
//
svc := identity.NewService(v, trustAnchors, &validity, recordEventFunc, expectedName, issuerPathCrt, issuerPathKey)
if err = svc.Initialize(); err != nil {
//nolint:gocritic
log.Fatalf("Failed to initialize identity service: %s", err)
}
go func() {
svc.Run(issuerEvent, issuerError)
}()
//
// Bind and serve
//
lis, err := net.Listen("tcp", *addr)
if err != nil {
//nolint:gocritic
log.Fatalf("Failed to listen on %s: %s", *addr, err)
}
if *traceCollector != "" {
if err := trace.InitializeTracing(componentName, *traceCollector); err != nil {
log.Warnf("failed to initialize tracing: %s", err)
}
}
srv := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
identity.Register(srv, svc)
go func() {
log.Infof("starting gRPC server on %s", *addr)
if err := srv.Serve(lis); err != nil {
log.Errorf("failed to start identity gRPC server: %s", err)
}
}()
ready = true
<-stop
log.Infof("shutting down gRPC server on %s", *addr)
srv.GracefulStop()
adminServer.Shutdown(ctx)
}