Profiling support (#562)

* Profiling support

* Move ProfilingPort to profiling package

* Fix golint errors

* Refactor watcher to accept variable length observers

* Cleaner happy path

* Remove profiling handlers argument

* use :8008 string as const

* Make the profiling port package private

* Make UpdateFromConfigMap member of profiling handler

* use mutex when accessing the enabled flag
* test the server as well

* Fixes

* Initialize profiling from configMap at startup

* Use httptest.ResponseRecorder to make the test more lightweight

* Fixes

* Do not initialize from configmap at startup
This commit is contained in:
Martin Gencur 2019-08-22 17:17:33 +02:00 committed by Knative Prow Robot
parent 85d7d7ec71
commit 366ab85660
8 changed files with 259 additions and 15 deletions

View File

@ -79,7 +79,7 @@ var _ Watcher = (*InformedWatcher)(nil)
var _ DefaultingWatcher = (*InformedWatcher)(nil)
// WatchWithDefault implements DefaultingWatcher.
func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o Observer) {
func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o ...Observer) {
i.defaults[cm.Name] = &cm
i.m.Lock()
@ -94,7 +94,7 @@ func (i *InformedWatcher) WatchWithDefault(cm corev1.ConfigMap, o Observer) {
panic("cannot WatchWithDefault after the InformedWatcher has started")
}
i.Watch(cm.Name, o)
i.Watch(cm.Name, o...)
}
// Start implements Watcher.

View File

@ -35,14 +35,14 @@ type ManualWatcher struct {
var _ Watcher = (*ManualWatcher)(nil)
// Watch implements Watcher
func (w *ManualWatcher) Watch(name string, o Observer) {
func (w *ManualWatcher) Watch(name string, o ...Observer) {
w.m.Lock()
defer w.m.Unlock()
if w.observers == nil {
w.observers = make(map[string][]Observer, 1)
w.observers = make(map[string][]Observer, len(o))
}
w.observers[name] = append(w.observers[name], o)
w.observers[name] = append(w.observers[name], o...)
}
func (w *ManualWatcher) Start(<-chan struct{}) error {

View File

@ -48,10 +48,12 @@ type StaticWatcher struct {
var _ Watcher = (*StaticWatcher)(nil)
// Watch implements Watcher
func (di *StaticWatcher) Watch(name string, o Observer) {
func (di *StaticWatcher) Watch(name string, o ...Observer) {
cm, ok := di.cfgs[name]
if ok {
o(cm)
for _, observer := range o {
observer(cm)
}
} else {
panic(fmt.Sprintf("Tried to watch unknown config with name %q", name))
}

View File

@ -290,7 +290,7 @@ type mockWatcher struct {
watches []string
}
func (w *mockWatcher) Watch(config string, o Observer) {
func (w *mockWatcher) Watch(config string, o ...Observer) {
w.watches = append(w.watches, config)
}

View File

@ -28,8 +28,8 @@ type Observer func(*corev1.ConfigMap)
// Watcher defines the interface that a configmap implementation must implement.
type Watcher interface {
// Watch is called to register a callback to be notified when a named ConfigMap changes.
Watch(string, Observer)
// Watch is called to register callbacks to be notified when a named ConfigMap changes.
Watch(string, ...Observer)
// Start is called to initiate the watches and provide a channel to signal when we should
// stop watching. When Start returns, all registered Observers will be called with the
@ -42,8 +42,8 @@ type Watcher interface {
type DefaultingWatcher interface {
Watcher
// WatchWithDefault is called to register a callback to be notified when a named ConfigMap
// WatchWithDefault is called to register callbacks to be notified when a named ConfigMap
// changes. The provided default value is always observed before any real ConfigMap with that
// name is. If the real ConfigMap with that name is deleted, then the default value is observed.
WatchWithDefault(cm corev1.ConfigMap, o Observer)
WatchWithDefault(cm corev1.ConfigMap, o ...Observer)
}

View File

@ -21,10 +21,12 @@ import (
"flag"
"fmt"
"log"
"net/http"
"os"
"os/user"
"path/filepath"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
@ -36,6 +38,7 @@ import (
"knative.dev/pkg/injection/clients/kubeclient"
"knative.dev/pkg/logging"
"knative.dev/pkg/metrics"
"knative.dev/pkg/profiling"
"knative.dev/pkg/signals"
"knative.dev/pkg/system"
)
@ -76,6 +79,7 @@ func GetLoggingConfig(ctx context.Context) (*logging.Config, error) {
if err != nil {
return nil, err
}
return logging.NewConfigFromConfigMap(loggingConfigMap)
}
@ -128,10 +132,16 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
controllers = append(controllers, cf(ctx, cmw))
}
profilingHandler := profiling.NewHandler(logger, false)
// Watch the logging config map and dynamically update logging levels.
cmw.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(logger, atomicLevel, component))
// Watch the observability config map and dynamically update metrics exporter.
cmw.Watch(metrics.ConfigMapName(), metrics.UpdateExporterFromConfigMap(component, logger))
// Watch the observability config map
cmw.Watch(metrics.ConfigMapName(),
metrics.UpdateExporterFromConfigMap(component, logger),
profilingHandler.UpdateFromConfigMap)
if err := cmw.Start(ctx.Done()); err != nil {
logger.Fatalw("failed to start configuration manager", zap.Error(err))
}
@ -144,7 +154,22 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
// Start all of the controllers.
logger.Info("Starting controllers...")
controller.StartAll(ctx.Done(), controllers...)
go controller.StartAll(ctx.Done(), controllers...)
profilingServer := profiling.NewServer(profilingHandler)
eg, egCtx := errgroup.WithContext(ctx)
eg.Go(profilingServer.ListenAndServe)
// This will block until either a signal arrives or one of the grouped functions
// returns an error.
<-egCtx.Done()
profilingServer.Shutdown(context.Background())
// Don't forward ErrServerClosed as that indicates we're already shutting down.
if err := eg.Wait(); err != nil && err != http.ErrServerClosed {
logger.Errorw("Error while running server", zap.Error(err))
}
}
func flush(logger *zap.SugaredLogger) {

114
profiling/server.go Normal file
View File

@ -0,0 +1,114 @@
/*
Copyright 2019 The Knative Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package profiling
import (
"net/http"
"net/http/pprof"
"strconv"
"sync"
perrors "github.com/pkg/errors"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
)
const (
// profilingPort is the port where we expose profiling information if profiling is enabled
profilingPort = ":8008"
// profilingKey is the name of the key in config-observability config map that indicates whether profiling
// is enabled of disabled
profilingKey = "profiling.enable"
)
// Handler holds the main HTTP handler and a flag indicating
// whether the handler is active
type Handler struct {
enabled bool
enabledMux sync.Mutex
handler http.Handler
log *zap.SugaredLogger
}
// NewHandler create a new ProfilingHandler which serves runtime profiling data
// according to the given context path
func NewHandler(logger *zap.SugaredLogger, enableProfiling bool) *Handler {
const pprofPrefix = "/debug/pprof/"
mux := http.NewServeMux()
mux.HandleFunc(pprofPrefix, pprof.Index)
mux.HandleFunc(pprofPrefix+"cmdline", pprof.Cmdline)
mux.HandleFunc(pprofPrefix+"profile", pprof.Profile)
mux.HandleFunc(pprofPrefix+"symbol", pprof.Symbol)
mux.HandleFunc(pprofPrefix+"trace", pprof.Trace)
logger.Infof("Profiling enabled: %t", enableProfiling)
return &Handler{
enabled: enableProfiling,
handler: mux,
log: logger,
}
}
func (h *Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
h.enabledMux.Lock()
defer h.enabledMux.Unlock()
if h.enabled {
h.handler.ServeHTTP(w, r)
} else {
http.NotFoundHandler().ServeHTTP(w, r)
}
}
func readProfilingFlag(configMap *corev1.ConfigMap) (bool, error) {
profiling, ok := configMap.Data[profilingKey]
if !ok {
return false, nil
}
enabled, err := strconv.ParseBool(profiling)
if err != nil {
return false, perrors.Wrapf(err, "failed to parse the profiling flag")
}
return enabled, nil
}
// UpdateFromConfigMap modifies the Enabled flag in the Handler
// according to the value in the given ConfigMap
func (h *Handler) UpdateFromConfigMap(configMap *corev1.ConfigMap) {
enabled, err := readProfilingFlag(configMap)
if err != nil {
h.log.Errorw("Failed to update the profiling flag", zap.Error(err))
return
}
h.enabledMux.Lock()
defer h.enabledMux.Unlock()
if h.enabled != enabled {
h.enabled = enabled
h.log.Infof("Profiling enabled: %t", h.enabled)
}
}
// NewServer creates a new http server that exposes profiling data using the
// HTTP handler that is passed as an argument
func NewServer(handler http.Handler) *http.Server {
return &http.Server{
Addr: profilingPort,
Handler: handler,
}
}

103
profiling/server_test.go Normal file
View File

@ -0,0 +1,103 @@
/*
Copyright 2019 The Knative Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package profiling
import (
"net/http"
"net/http/httptest"
"testing"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"knative.dev/pkg/metrics"
"knative.dev/pkg/system"
_ "knative.dev/pkg/system/testing"
)
func TestUpdateFromConfigMap(t *testing.T) {
observabilityConfigTests := []struct {
name string
wantEnabled bool
wantStatusCode int
config *corev1.ConfigMap
}{{
name: "observability with profiling disabled",
wantEnabled: false,
wantStatusCode: http.StatusNotFound,
config: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: system.Namespace(),
Name: metrics.ConfigMapName(),
},
Data: map[string]string{
"profiling.enable": "false",
},
},
}, {
name: "observability config with profiling enabled",
wantEnabled: true,
wantStatusCode: http.StatusOK,
config: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: system.Namespace(),
Name: metrics.ConfigMapName(),
},
Data: map[string]string{
"profiling.enable": "true",
},
},
}, {
name: "observability config with unparseable value",
wantEnabled: false,
wantStatusCode: http.StatusNotFound,
config: &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Namespace: system.Namespace(),
Name: metrics.ConfigMapName(),
},
Data: map[string]string{
"profiling.enable": "get me some profiles",
},
},
}}
for _, tt := range observabilityConfigTests {
t.Run(tt.name, func(t *testing.T) {
handler := NewHandler(zap.NewNop().Sugar(), false)
handler.UpdateFromConfigMap(tt.config)
req, err := http.NewRequest(http.MethodGet, "/debug/pprof/", nil)
if err != nil {
t.Fatal("Error creating request:", err)
}
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
if rr.Code != tt.wantStatusCode {
t.Errorf("StatusCode: %v, want: %v", rr.Code, tt.wantStatusCode)
}
if handler.enabled != tt.wantEnabled {
t.Fatalf("Test: %q; want %v, but got %v", tt.name, tt.wantEnabled, handler.enabled)
}
})
}
}