mirror of https://github.com/linkerd/linkerd2.git
223 lines
6.8 KiB
Go
223 lines
6.8 KiB
Go
package identity
|
|
|
|
import (
|
|
"context"
|
|
"flag"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"syscall"
|
|
"time"
|
|
|
|
idctl "github.com/linkerd/linkerd2/controller/identity"
|
|
"github.com/linkerd/linkerd2/pkg/admin"
|
|
"github.com/linkerd/linkerd2/pkg/flags"
|
|
"github.com/linkerd/linkerd2/pkg/identity"
|
|
"github.com/linkerd/linkerd2/pkg/k8s"
|
|
"github.com/linkerd/linkerd2/pkg/prometheus"
|
|
"github.com/linkerd/linkerd2/pkg/tls"
|
|
"github.com/linkerd/linkerd2/pkg/trace"
|
|
log "github.com/sirupsen/logrus"
|
|
"google.golang.org/grpc"
|
|
corev1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/client-go/kubernetes/scheme"
|
|
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
|
|
"k8s.io/client-go/tools/record"
|
|
)
|
|
|
|
// Main executes the identity subcommand
|
|
func Main(args []string) {
|
|
cmd := flag.NewFlagSet("identity", flag.ExitOnError)
|
|
|
|
addr := cmd.String("addr", ":8080", "address to serve on")
|
|
adminAddr := cmd.String("admin-addr", ":9990", "address of HTTP admin server")
|
|
kubeConfigPath := cmd.String("kubeconfig", "", "path to kube config")
|
|
controllerNS := cmd.String("controller-namespace", "", "namespace in which Linkerd is installed")
|
|
identityScheme := cmd.String("identity-scheme", "", "scheme used for the identity issuer secret format")
|
|
trustDomain := cmd.String("identity-trust-domain", "", "configures the name suffix used for identities")
|
|
identityIssuanceLifeTime := cmd.String("identity-issuance-lifetime", "", "the amount of time for which the Identity issuer should certify identity")
|
|
identityClockSkewAllowance := cmd.String("identity-clock-skew-allowance", "", "the amount of time to allow for clock skew within a Linkerd cluster")
|
|
enablePprof := cmd.Bool("enable-pprof", false, "Enable pprof endpoints on the admin server")
|
|
qps := cmd.Float64("kube-apiclient-qps", 100, "Maximum QPS sent to the kube-apiserver before throttling")
|
|
burst := cmd.Int("kube-apiclient-burst", 200, "Burst value over kube-apiclient-qps")
|
|
|
|
issuerPath := cmd.String("issuer",
|
|
"/var/run/linkerd/identity/issuer",
|
|
"path to directory containing issuer credentials")
|
|
|
|
var issuerPathCrt string
|
|
var issuerPathKey string
|
|
traceCollector := flags.AddTraceFlags(cmd)
|
|
componentName := "linkerd-identity"
|
|
|
|
flags.ConfigureAndParse(cmd, args)
|
|
|
|
ready := false
|
|
adminServer := admin.NewServer(*adminAddr, *enablePprof, &ready)
|
|
|
|
go func() {
|
|
log.Infof("starting admin server on %s", *adminAddr)
|
|
if err := adminServer.ListenAndServe(); err != nil {
|
|
log.Errorf("failed to start identity admin server: %s", err)
|
|
}
|
|
}()
|
|
|
|
identityTrustAnchorPEM, err := os.ReadFile(k8s.MountPathTrustRootsPEM)
|
|
if err != nil {
|
|
log.Fatalf("could not read identity trust anchors PEM: %s", err.Error())
|
|
}
|
|
|
|
stop := make(chan os.Signal, 1)
|
|
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
if *identityScheme == "" || *trustDomain == "" {
|
|
log.Infof("Identity disabled in control plane configuration.")
|
|
//nolint:gocritic
|
|
os.Exit(0)
|
|
}
|
|
|
|
if *identityScheme == k8s.IdentityIssuerSchemeLinkerd {
|
|
issuerPathCrt = filepath.Join(*issuerPath, k8s.IdentityIssuerCrtName)
|
|
issuerPathKey = filepath.Join(*issuerPath, k8s.IdentityIssuerKeyName)
|
|
} else {
|
|
issuerPathCrt = filepath.Join(*issuerPath, corev1.TLSCertKey)
|
|
issuerPathKey = filepath.Join(*issuerPath, corev1.TLSPrivateKeyKey)
|
|
}
|
|
|
|
dom, err := idctl.NewTrustDomain(*controllerNS, *trustDomain)
|
|
if err != nil {
|
|
//nolint:gocritic
|
|
log.Fatalf("Invalid trust domain: %s", err.Error())
|
|
}
|
|
|
|
trustAnchors, err := tls.DecodePEMCertPool(string(identityTrustAnchorPEM))
|
|
if err != nil {
|
|
//nolint:gocritic
|
|
log.Fatalf("Failed to read trust anchors: %s", err)
|
|
}
|
|
|
|
validity := tls.Validity{
|
|
ClockSkewAllowance: tls.DefaultClockSkewAllowance,
|
|
Lifetime: identity.DefaultIssuanceLifetime,
|
|
}
|
|
if pbd := *identityClockSkewAllowance; pbd != "" {
|
|
csa, err := time.ParseDuration(pbd)
|
|
if err != nil {
|
|
log.Warnf("Invalid clock skew allowance: %s", err)
|
|
} else {
|
|
validity.ClockSkewAllowance = csa
|
|
}
|
|
}
|
|
if pbd := *identityIssuanceLifeTime; pbd != "" {
|
|
il, err := time.ParseDuration(pbd)
|
|
if err != nil {
|
|
log.Warnf("Invalid issuance lifetime: %s", err)
|
|
} else {
|
|
validity.Lifetime = il
|
|
}
|
|
}
|
|
|
|
expectedName := fmt.Sprintf("identity.%s.%s", *controllerNS, *trustDomain)
|
|
issuerEvent := make(chan struct{})
|
|
issuerError := make(chan error)
|
|
|
|
//
|
|
// Create and start FS creds watcher
|
|
//
|
|
watcher := tls.NewFsCredsWatcher(*issuerPath, issuerEvent, issuerError)
|
|
go func() {
|
|
if err := watcher.StartWatching(ctx); err != nil {
|
|
//nolint:gocritic
|
|
log.Fatalf("Failed to start creds watcher: %s", err)
|
|
}
|
|
}()
|
|
|
|
//
|
|
// Create k8s API
|
|
//
|
|
config, err := k8s.GetConfig(*kubeConfigPath, "")
|
|
if err != nil {
|
|
log.Fatalf("Error configuring Kubernetes API client: %s", err)
|
|
}
|
|
k8sAPI, err := k8s.NewAPIForConfig(config, "", []string{}, 0, float32(*qps), *burst)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load kubeconfig: %s: %s", *kubeConfigPath, err)
|
|
}
|
|
log.Infof("Using k8s client with QPS=%.2f Burst=%d", config.QPS, config.Burst)
|
|
|
|
v, err := idctl.NewK8sTokenValidator(ctx, k8sAPI, dom)
|
|
if err != nil {
|
|
log.Fatalf("Failed to initialize identity service: %s", err)
|
|
}
|
|
|
|
// Create K8s event recorder
|
|
eventBroadcaster := record.NewBroadcaster()
|
|
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{
|
|
Interface: k8sAPI.CoreV1().Events(""),
|
|
})
|
|
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: componentName})
|
|
|
|
if err != nil {
|
|
log.Fatalf("Failed to construct k8s event recorder: %s", err)
|
|
}
|
|
|
|
recordEventFunc := func(parent runtime.Object, eventType, reason, message string) {
|
|
if parent == nil {
|
|
parent = &corev1.ObjectReference{
|
|
APIVersion: "apps/v1",
|
|
Kind: "Deployment",
|
|
Namespace: *controllerNS,
|
|
Name: componentName,
|
|
}
|
|
}
|
|
recorder.Event(parent, eventType, reason, message)
|
|
}
|
|
|
|
//
|
|
// Create, initialize and run service
|
|
//
|
|
svc := identity.NewService(v, trustAnchors, &validity, recordEventFunc, expectedName, issuerPathCrt, issuerPathKey)
|
|
if err = svc.Initialize(); err != nil {
|
|
//nolint:gocritic
|
|
log.Fatalf("Failed to initialize identity service: %s", err)
|
|
}
|
|
go func() {
|
|
svc.Run(issuerEvent, issuerError)
|
|
}()
|
|
|
|
//
|
|
// Bind and serve
|
|
//
|
|
lis, err := net.Listen("tcp", *addr)
|
|
if err != nil {
|
|
//nolint:gocritic
|
|
log.Fatalf("Failed to listen on %s: %s", *addr, err)
|
|
}
|
|
|
|
if *traceCollector != "" {
|
|
if err := trace.InitializeTracing(componentName, *traceCollector); err != nil {
|
|
log.Warnf("failed to initialize tracing: %s", err)
|
|
}
|
|
}
|
|
srv := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
|
|
identity.Register(srv, svc)
|
|
go func() {
|
|
log.Infof("starting gRPC server on %s", *addr)
|
|
if err := srv.Serve(lis); err != nil {
|
|
log.Errorf("failed to start identity gRPC server: %s", err)
|
|
}
|
|
}()
|
|
|
|
ready = true
|
|
|
|
<-stop
|
|
log.Infof("shutting down gRPC server on %s", *addr)
|
|
srv.GracefulStop()
|
|
adminServer.Shutdown(ctx)
|
|
}
|