From 366ab85660d4e1c260795bd23c2aa09a4721c282 Mon Sep 17 00:00:00 2001 From: Martin Gencur Date: Thu, 22 Aug 2019 17:17:33 +0200 Subject: [PATCH] Profiling support (#562) * Profiling support * Move ProfilingPort to profiling package * Fix golint errors * Refactor watcher to accept variable length observers * Cleaner happy path * Remove profiling handlers argument * use :8008 string as const * Make the profiling port package private * Make UpdateFromConfigMap member of profiling handler * use mutex when accessing the enabled flag * test the server as well * Fixes * Initialize profiling from configMap at startup * Use httptest.ResponseRecorder to make the test more lightweight * Fixes * Do not initialize from configmap at startup --- configmap/informed_watcher.go | 4 +- configmap/manual_watcher.go | 6 +- configmap/static_watcher.go | 6 +- configmap/store_test.go | 2 +- configmap/watcher.go | 8 +-- injection/sharedmain/main.go | 31 ++++++++- profiling/server.go | 114 ++++++++++++++++++++++++++++++++++ profiling/server_test.go | 103 ++++++++++++++++++++++++++++++ 8 files changed, 259 insertions(+), 15 deletions(-) create mode 100644 profiling/server.go create mode 100644 profiling/server_test.go diff --git a/configmap/informed_watcher.go b/configmap/informed_watcher.go index 5903d59d7..ed247ae13 100644 --- a/configmap/informed_watcher.go +++ b/configmap/informed_watcher.go @@ -79,7 +79,7 @@ var _ Watcher = (*InformedWatcher)(nil) var _ DefaultingWatcher = (*InformedWatcher)(nil) // WatchWithDefault implements DefaultingWatcher. -func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o Observer) { +func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...Observer) { i.defaults[cm.Name] = &cm i.m.Lock() @@ -94,7 +94,7 @@ func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o Observer) { panic("cannot WatchWithDefault after the InformedWatcher has started") } - i.Watch(cm.Name, o) + i.Watch(cm.Name, o...) } // Start implements Watcher. diff --git a/configmap/manual_watcher.go b/configmap/manual_watcher.go index ad243de55..dc801e777 100644 --- a/configmap/manual_watcher.go +++ b/configmap/manual_watcher.go @@ -35,14 +35,14 @@ type ManualWatcher struct { var _ Watcher = (*ManualWatcher)(nil) // Watch implements Watcher -func (w *ManualWatcher) Watch(name string, o Observer) { +func (w *ManualWatcher) Watch(name string, o ...Observer) { w.m.Lock() defer w.m.Unlock() if w.observers == nil { - w.observers = make(map[string][]Observer, 1) + w.observers = make(map[string][]Observer, len(o)) } - w.observers[name] = append(w.observers[name], o) + w.observers[name] = append(w.observers[name], o...) } func (w *ManualWatcher) Start(<-chan struct{}) error { diff --git a/configmap/static_watcher.go b/configmap/static_watcher.go index 96a01140d..43ff9de3c 100644 --- a/configmap/static_watcher.go +++ b/configmap/static_watcher.go @@ -48,10 +48,12 @@ type StaticWatcher struct { var _ Watcher = (*StaticWatcher)(nil) // Watch implements Watcher -func (di *StaticWatcher) Watch(name string, o Observer) { +func (di *StaticWatcher) Watch(name string, o ...Observer) { cm, ok := di.cfgs[name] if ok { - o(cm) + for _, observer := range o { + observer(cm) + } } else { panic(fmt.Sprintf("Tried to watch unknown config with name %q", name)) } diff --git a/configmap/store_test.go b/configmap/store_test.go index 766cf79f0..f5ba2c002 100644 --- a/configmap/store_test.go +++ b/configmap/store_test.go @@ -290,7 +290,7 @@ type mockWatcher struct { watches []string } -func (w *mockWatcher) Watch(config string, o Observer) { +func (w *mockWatcher) Watch(config string, o ...Observer) { w.watches = append(w.watches, config) } diff --git a/configmap/watcher.go b/configmap/watcher.go index 71a18f495..ff703dbc7 100644 --- a/configmap/watcher.go +++ b/configmap/watcher.go @@ -28,8 +28,8 @@ type Observer func(*corev1.ConfigMap) // Watcher defines the interface that a configmap implementation must implement. type Watcher interface { - // Watch is called to register a callback to be notified when a named ConfigMap changes. - Watch(string, Observer) + // Watch is called to register callbacks to be notified when a named ConfigMap changes. + Watch(string, ...Observer) // Start is called to initiate the watches and provide a channel to signal when we should // stop watching. When Start returns, all registered Observers will be called with the @@ -42,8 +42,8 @@ type Watcher interface { type DefaultingWatcher interface { Watcher - // WatchWithDefault is called to register a callback to be notified when a named ConfigMap + // WatchWithDefault is called to register callbacks to be notified when a named ConfigMap // changes. The provided default value is always observed before any real ConfigMap with that // name is. If the real ConfigMap with that name is deleted, then the default value is observed. - WatchWithDefault(cm corev1.ConfigMap, o Observer) + WatchWithDefault(cm corev1.ConfigMap, o ...Observer) } diff --git a/injection/sharedmain/main.go b/injection/sharedmain/main.go index 47f32d442..e8e560486 100644 --- a/injection/sharedmain/main.go +++ b/injection/sharedmain/main.go @@ -21,10 +21,12 @@ import ( "flag" "fmt" "log" + "net/http" "os" "os/user" "path/filepath" + "golang.org/x/sync/errgroup" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" @@ -36,6 +38,7 @@ import ( "knative.dev/pkg/injection/clients/kubeclient" "knative.dev/pkg/logging" "knative.dev/pkg/metrics" + "knative.dev/pkg/profiling" "knative.dev/pkg/signals" "knative.dev/pkg/system" ) @@ -76,6 +79,7 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) { if err != nil { return nil, err } + return logging.NewConfigFromConfigMap(loggingConfigMap) } @@ -128,10 +132,16 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto controllers = append(controllers, cf(ctx, cmw)) } + profilingHandler := profiling.NewHandler(logger, false) + // Watch the logging config map and dynamically update logging levels. cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component)) - // Watch the observability config map and dynamically update metrics exporter. - cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap(component, logger)) + + // Watch the observability config map + cmw.Watch(metrics.ConfigMapName(), + metrics.UpdateExporterFromConfigMap(component, logger), + profilingHandler.UpdateFromConfigMap) + if err := cmw.Start(ctx.Done()); err != nil { logger.Fatalw("failed to start configuration manager", zap.Error(err)) } @@ -144,7 +154,22 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto // Start all of the controllers. logger.Info("Starting controllers...") - controller.StartAll(ctx.Done(), controllers...) + go controller.StartAll(ctx.Done(), controllers...) + + profilingServer := profiling.NewServer(profilingHandler) + + eg, egCtx := errgroup.WithContext(ctx) + eg.Go(profilingServer.ListenAndServe) + + // This will block until either a signal arrives or one of the grouped functions + // returns an error. + <-egCtx.Done() + + profilingServer.Shutdown(context.Background()) + // Don't forward ErrServerClosed as that indicates we're already shutting down. + if err := eg.Wait(); err != nil && err != http.ErrServerClosed { + logger.Errorw("Error while running server", zap.Error(err)) + } } func flush(logger *zap.SugaredLogger) { diff --git a/profiling/server.go b/profiling/server.go new file mode 100644 index 000000000..1fa3cb46f --- /dev/null +++ b/profiling/server.go @@ -0,0 +1,114 @@ +/* +Copyright 2019 The Knative Authors + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package profiling + +import ( + "net/http" + "net/http/pprof" + "strconv" + "sync" + + perrors "github.com/pkg/errors" + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" +) + +const ( + // profilingPort is the port where we expose profiling information if profiling is enabled + profilingPort = ":8008" + + // profilingKey is the name of the key in config-observability config map that indicates whether profiling + // is enabled of disabled + profilingKey = "profiling.enable" +) + +// Handler holds the main HTTP handler and a flag indicating +// whether the handler is active +type Handler struct { + enabled bool + enabledMux sync.Mutex + handler http.Handler + log *zap.SugaredLogger +} + +// NewHandler create a new ProfilingHandler which serves runtime profiling data +// according to the given context path +func NewHandler(logger *zap.SugaredLogger, enableProfiling bool) *Handler { + const pprofPrefix = "/debug/pprof/" + + mux := http.NewServeMux() + mux.HandleFunc(pprofPrefix, pprof.Index) + mux.HandleFunc(pprofPrefix+"cmdline", pprof.Cmdline) + mux.HandleFunc(pprofPrefix+"profile", pprof.Profile) + mux.HandleFunc(pprofPrefix+"symbol", pprof.Symbol) + mux.HandleFunc(pprofPrefix+"trace", pprof.Trace) + + logger.Infof("Profiling enabled: %t", enableProfiling) + + return &Handler{ + enabled: enableProfiling, + handler: mux, + log: logger, + } +} + +func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) { + h.enabledMux.Lock() + defer h.enabledMux.Unlock() + if h.enabled { + h.handler.ServeHTTP(w, r) + } else { + http.NotFoundHandler().ServeHTTP(w, r) + } +} + +func readProfilingFlag(configMap *corev1.ConfigMap) (bool, error) { + profiling, ok := configMap.Data[profilingKey] + if !ok { + return false, nil + } + enabled, err := strconv.ParseBool(profiling) + if err != nil { + return false, perrors.Wrapf(err, "failed to parse the profiling flag") + } + return enabled, nil +} + +// UpdateFromConfigMap modifies the Enabled flag in the Handler +// according to the value in the given ConfigMap +func (h *Handler) UpdateFromConfigMap(configMap *corev1.ConfigMap) { + enabled, err := readProfilingFlag(configMap) + if err != nil { + h.log.Errorw("Failed to update the profiling flag", zap.Error(err)) + return + } + h.enabledMux.Lock() + defer h.enabledMux.Unlock() + if h.enabled != enabled { + h.enabled = enabled + h.log.Infof("Profiling enabled: %t", h.enabled) + } +} + +// NewServer creates a new http server that exposes profiling data using the +// HTTP handler that is passed as an argument +func NewServer(handler http.Handler) *http.Server { + return &http.Server{ + Addr: profilingPort, + Handler: handler, + } +} diff --git a/profiling/server_test.go b/profiling/server_test.go new file mode 100644 index 000000000..314d61c3a --- /dev/null +++ b/profiling/server_test.go @@ -0,0 +1,103 @@ +/* +Copyright 2019 The Knative Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package profiling + +import ( + "net/http" + "net/http/httptest" + "testing" + + "go.uber.org/zap" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "knative.dev/pkg/metrics" + "knative.dev/pkg/system" + _ "knative.dev/pkg/system/testing" +) + +func TestUpdateFromConfigMap(t *testing.T) { + observabilityConfigTests := []struct { + name string + wantEnabled bool + wantStatusCode int + config *corev1.ConfigMap + }{{ + name: "observability with profiling disabled", + wantEnabled: false, + wantStatusCode: http.StatusNotFound, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: metrics.ConfigMapName(), + }, + Data: map[string]string{ + "profiling.enable": "false", + }, + }, + }, { + name: "observability config with profiling enabled", + wantEnabled: true, + wantStatusCode: http.StatusOK, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: metrics.ConfigMapName(), + }, + Data: map[string]string{ + "profiling.enable": "true", + }, + }, + }, { + name: "observability config with unparseable value", + wantEnabled: false, + wantStatusCode: http.StatusNotFound, + config: &corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + Namespace: system.Namespace(), + Name: metrics.ConfigMapName(), + }, + Data: map[string]string{ + "profiling.enable": "get me some profiles", + }, + }, + }} + + for _, tt := range observabilityConfigTests { + t.Run(tt.name, func(t *testing.T) { + handler := NewHandler(zap.NewNop().Sugar(), false) + + handler.UpdateFromConfigMap(tt.config) + + req, err := http.NewRequest(http.MethodGet, "/debug/pprof/", nil) + if err != nil { + t.Fatal("Error creating request:", err) + } + + rr := httptest.NewRecorder() + + handler.ServeHTTP(rr, req) + + if rr.Code != tt.wantStatusCode { + t.Errorf("StatusCode: %v, want: %v", rr.Code, tt.wantStatusCode) + } + + if handler.enabled != tt.wantEnabled { + t.Fatalf("Test: %q; want %v, but got %v", tt.name, tt.wantEnabled, handler.enabled) + } + }) + } +}