caching/vendor/knative.dev/pkg/injection/sharedmain/main.go

194 lines
6.5 KiB
Go

/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package sharedmain
import (
"context"
"flag"
"fmt"
"log"
"net/http"
"os"
"os/user"
"path/filepath"
"time"
"go.opencensus.io/stats/view"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"go.uber.org/zap"
apierrors "k8s.io/apimachinery/pkg/api/errors"
kubeclient "knative.dev/pkg/client/injection/kube/client"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
"knative.dev/pkg/injection"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
)
// GetConfig returns a rest.Config to be used for kubernetes client creation.
// It does so in the following order:
// 1. Use the passed kubeconfig/masterURL.
// 2. Fallback to the KUBECONFIG environment variable.
// 3. Fallback to in-cluster config.
// 4. Fallback to the ~/.kube/config.
func GetConfig(masterURL, kubeconfig string) (*rest.Config, error) {
if kubeconfig == "" {
kubeconfig = os.Getenv("KUBECONFIG")
}
// If we have an explicit indication of where the kubernetes config lives, read that.
if kubeconfig != "" {
return clientcmd.BuildConfigFromFlags(masterURL, kubeconfig)
}
// If not, try the in-cluster config.
if c, err := rest.InClusterConfig(); err == nil {
return c, nil
}
// If no in-cluster config, try the default location in the user's home directory.
if usr, err := user.Current(); err == nil {
if c, err := clientcmd.BuildConfigFromFlags("", filepath.Join(usr.HomeDir, ".kube", "config")); err == nil {
return c, nil
}
}
return nil, fmt.Errorf("could not create a valid kubeconfig")
}
// GetLoggingConfig gets the logging config from either the file system if present
// or via reading a configMap from the API.
// The context is expected to be initialized with injection.
func GetLoggingConfig(ctx context.Context) (*logging.Config, error) {
loggingConfigMap, err := kubeclient.Get(ctx).CoreV1().ConfigMaps(system.Namespace()).Get(logging.ConfigMapName(), metav1.GetOptions{})
if err != nil {
if apierrors.IsNotFound(err) {
return logging.NewConfigFromMap(nil)
} else {
return nil, err
}
}
return logging.NewConfigFromConfigMap(loggingConfigMap)
}
func Main(component string, ctors ...injection.ControllerConstructor) {
// Set up signals so we handle the first shutdown signal gracefully.
MainWithContext(signals.NewContext(), component, ctors...)
}
func MainWithContext(ctx context.Context, component string, ctors ...injection.ControllerConstructor) {
var (
masterURL = flag.String("master", "", "The address of the Kubernetes API server. Overrides any value in kubeconfig. Only required if out-of-cluster.")
kubeconfig = flag.String("kubeconfig", "", "Path to a kubeconfig. Only required if out-of-cluster.")
)
flag.Parse()
cfg, err := GetConfig(*masterURL, *kubeconfig)
if err != nil {
log.Fatal("Error building kubeconfig", err)
}
MainWithConfig(ctx, component, cfg, ctors...)
}
func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, ctors ...injection.ControllerConstructor) {
log.Printf("Registering %d clients", len(injection.Default.GetClients()))
log.Printf("Registering %d informer factories", len(injection.Default.GetInformerFactories()))
log.Printf("Registering %d informers", len(injection.Default.GetInformers()))
log.Printf("Registering %d controllers", len(ctors))
// Report stats on Go memory usage every 30 seconds.
msp := metrics.NewMemStatsAll()
msp.Start(ctx, 30*time.Second)
if err := view.Register(msp.DefaultViews()...); err != nil {
log.Fatalf("Error exporting go memstats view: %v", err)
}
// Adjust our client's rate limits based on the number of controller's we are running.
cfg.QPS = float32(len(ctors)) * rest.DefaultQPS
cfg.Burst = len(ctors) * rest.DefaultBurst
ctx, informers := injection.Default.SetupInformers(ctx, cfg)
// Set up our logger.
loggingConfig, err := GetLoggingConfig(ctx)
if err != nil {
log.Fatal("Error reading/parsing logging configuration:", err)
}
logger, atomicLevel := logging.NewLoggerFromConfig(loggingConfig, component)
defer flush(logger)
ctx = logging.WithLogger(ctx, logger)
// TODO(mattmoor): This should itself take a context and be injection-based.
cmw := configmap.NewInformedWatcher(kubeclient.Get(ctx), system.Namespace())
// Based on the reconcilers we have linked, build up the set of controllers to run.
controllers := make([]*controller.Impl, 0, len(ctors))
for _, cf := range ctors {
controllers = append(controllers, cf(ctx, cmw))
}
profilingHandler := profiling.NewHandler(logger, false)
// Watch the logging config map and dynamically update logging levels.
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
// Watch the observability config map
cmw.Watch(metrics.ConfigMapName(),
metrics.UpdateExporterFromConfigMap(component, logger),
profilingHandler.UpdateFromConfigMap)
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("failed to start configuration manager", zap.Error(err))
}
// Start all of the informers and wait for them to sync.
logger.Info("Starting informers.")
if err := controller.StartInformers(ctx.Done(), informers...); err != nil {
logger.Fatalw("Failed to start informers", err)
}
// Start all of the controllers.
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()
profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
}
}
func flush(logger *zap.SugaredLogger) {
logger.Sync()
metrics.FlushExporter()
}