Make updating metrics config and metrics exporter atomic (#1271)

* Make updating metricsConfig and updating metricsExporter atomic

This requires doing anything with a kubeclient before obtaining the
metricsMux lock.

* Address PR
This commit is contained in:
Annie Fu 2020-05-01 15:35:42 -07:00 committed by GitHub
parent 2e4e82aa49
commit 099343d49e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 168 additions and 82 deletions

View File

@ -29,6 +29,7 @@ import (
"go.opencensus.io/stats"
"go.uber.org/zap"
corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/metrics/metricskey"
)
@ -88,9 +89,8 @@ type metricsConfig struct {
// writing the metrics to the stats.RecordWithOptions interface.
recorder func(context.Context, []stats.Measurement, ...stats.Options) error
// secretFetcher provides access for fetching Kubernetes Secrets from an
// informer cache.
secretFetcher SecretFetcher
// secret contains credentials for an exporter to use for authentication.
secret *corev1.Secret
// ---- OpenCensus specific below ----
// collectorAddress is the address of the collector, if not `localhost:55678`
@ -162,10 +162,6 @@ func (mc *metricsConfig) record(ctx context.Context, mss []stats.Measurement, ro
func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metricsConfig, error) {
var mc metricsConfig
// We don't check if this is `nil` right now, because this is a transition step.
// Eventually, this should be a startup check.
mc.secretFetcher = ops.Secrets
if ops.Domain == "" {
return nil, errors.New("metrics domain cannot be empty")
}
@ -205,6 +201,13 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri
if mc.requireSecure, err = strconv.ParseBool(isSecure); err != nil {
return nil, fmt.Errorf("invalid %s value %q", CollectorSecureKey, isSecure)
}
if mc.requireSecure {
mc.secret, err = getOpenCensusSecret(ops.Component, ops.Secrets)
if err != nil {
return nil, err
}
}
}
}
@ -265,6 +268,15 @@ func createMetricsConfig(ops ExporterOptions, logger *zap.SugaredLogger) (*metri
return stats.RecordWithOptions(ctx, append(ros, stats.WithMeasurements(mss...))...)
}
}
if scc.UseSecret {
secret, err := getStackdriverSecret(ops.Secrets)
if err != nil {
return nil, err
}
mc.secret = secret
}
}
// If reporting period is specified, use the value from the configuration.

View File

@ -32,6 +32,7 @@ import (
"knative.dev/pkg/metrics/metricstest"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// TODO UTs should move to eventing and serving, as appropriate.
@ -201,6 +202,12 @@ var (
},
Domain: servingDomain,
Component: testComponent,
Secrets: fakeSecretList(corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: StackdriverSecretNameDefault,
Namespace: StackdriverSecretNamespaceDefault,
},
}).Get,
},
expectedConfig: metricsConfig{
domain: servingDomain,
@ -216,6 +223,12 @@ var (
ClusterName: "cluster",
UseSecret: true,
},
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: StackdriverSecretNameDefault,
Namespace: StackdriverSecretNamespaceDefault,
},
},
},
expectedNewExporter: true,
}, {
@ -253,6 +266,15 @@ var (
},
Domain: servingDomain,
Component: testComponent,
Secrets: fakeSecretList(corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "opencensus",
},
Data: map[string][]byte{
"client-cert.pem": {},
"client-key.pem": {},
},
}).Get,
},
expectedConfig: metricsConfig{
domain: servingDomain,
@ -260,6 +282,15 @@ var (
backendDestination: OpenCensus,
collectorAddress: "external-svc:55678",
requireSecure: true,
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "opencensus",
},
Data: map[string][]byte{
"client-cert.pem": {},
"client-key.pem": {},
},
},
},
expectedNewExporter: true,
}, {
@ -514,6 +545,10 @@ var (
}}
)
func successTestsInit() {
SetStackdriverSecretLocation(StackdriverSecretNameDefault, StackdriverSecretNamespaceDefault)
}
func TestGetMetricsConfig(t *testing.T) {
for _, test := range errorTests {
t.Run(test.name, func(t *testing.T) {
@ -525,6 +560,7 @@ func TestGetMetricsConfig(t *testing.T) {
})
}
successTestsInit()
for _, test := range successTests {
t.Run(test.name, func(t *testing.T) {
defer ClearAll()
@ -558,6 +594,7 @@ func TestGetMetricsConfig_fromEnv(t *testing.T) {
func TestIsNewExporterRequiredFromNilConfig(t *testing.T) {
setCurMetricsConfig(nil)
successTestsInit()
for _, test := range successTests {
t.Run(test.name, func(t *testing.T) {
defer ClearAll()
@ -685,6 +722,7 @@ func TestIsNewExporterRequired(t *testing.T) {
func TestUpdateExporter(t *testing.T) {
setCurMetricsConfig(nil)
oldConfig := getCurMetricsConfig()
successTestsInit()
for _, test := range successTests[1:] {
t.Run(test.name, func(t *testing.T) {
defer ClearAll()
@ -715,6 +753,7 @@ func TestUpdateExporter(t *testing.T) {
func TestUpdateExporterFromConfigMapWithOpts(t *testing.T) {
setCurMetricsConfig(nil)
oldConfig := getCurMetricsConfig()
successTestsInit()
for _, test := range successTests[1:] {
t.Run(test.name, func(t *testing.T) {
defer ClearAll()
@ -722,6 +761,7 @@ func TestUpdateExporterFromConfigMapWithOpts(t *testing.T) {
Component: test.ops.Component,
Domain: test.ops.Domain,
PrometheusPort: test.ops.PrometheusPort,
Secrets: test.ops.Secrets,
}
updateFunc, err := UpdateExporterFromConfigMapWithOpts(opts, TestLogger(t))
if err != nil {

View File

@ -121,6 +121,7 @@ func UpdateExporterFromConfigMapWithOpts(opts ExporterOptions, logger *zap.Sugar
Component: opts.Component,
ConfigMap: configMap.Data,
PrometheusPort: opts.PrometheusPort,
Secrets: opts.Secrets,
}, logger)
}, nil
}
@ -130,6 +131,7 @@ func UpdateExporterFromConfigMapWithOpts(opts ExporterOptions, logger *zap.Sugar
// to prevent a race condition between reading the current configuration
// and updating the current exporter.
func UpdateExporter(ops ExporterOptions, logger *zap.SugaredLogger) error {
// TODO(https://github.com/knative/pkg/issues/1273): check if ops.secrets is `nil` after new metrics plan lands
newConfig, err := createMetricsConfig(ops, logger)
if err != nil {
if getCurMetricsConfig() == nil {
@ -141,28 +143,33 @@ func UpdateExporter(ops ExporterOptions, logger *zap.SugaredLogger) error {
return err
}
// Updating the metrics config and the metrics exporters needs to be atomic to
// avoid using an outdated metrics config with new exporters.
metricsMux.Lock()
defer metricsMux.Unlock()
if isNewExporterRequired(newConfig) {
logger.Info("Flushing the existing exporter before setting up the new exporter.")
FlushExporter()
flushGivenExporter(curMetricsExporter)
e, err := newMetricsExporter(newConfig, logger)
if err != nil {
logger.Errorf("Failed to update a new metrics exporter based on metric config %v. error: %v", newConfig, err)
return err
}
existingConfig := getCurMetricsConfig()
setCurMetricsExporter(e)
existingConfig := curMetricsConfig
curMetricsExporter = e
logger.Infof("Successfully updated the metrics exporter; old config: %v; new config %v", existingConfig, newConfig)
}
setCurMetricsConfig(newConfig)
setCurMetricsConfigUnlocked(newConfig)
return nil
}
// isNewExporterRequired compares the non-nil newConfig against curMetricsConfig. When backend changes,
// or stackdriver project ID changes for stackdriver backend, we need to update the metrics exporter.
// This function is not implicitly thread-safe.
// This function must be called with the metricsMux reader (or writer) locked.
func isNewExporterRequired(newConfig *metricsConfig) bool {
cc := getCurMetricsConfig()
cc := curMetricsConfig
if cc == nil || newConfig.backendDestination != cc.backendDestination {
return true
}
@ -177,15 +184,14 @@ func isNewExporterRequired(newConfig *metricsConfig) bool {
}
// newMetricsExporter gets a metrics exporter based on the config.
// This function is not implicitly thread-safe.
// This function must be called with the metricsMux reader (or writer) locked.
func newMetricsExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) {
ce := getCurMetricsExporter()
// If there is a Prometheus Exporter server running, stop it.
resetCurPromSrv()
// TODO(https://github.com/knative/pkg/issues/866): Move Stackdriver and Promethus
// operations before stopping to an interface.
if se, ok := ce.(stoppable); ok {
if se, ok := curMetricsExporter.(stoppable); ok {
se.StopMetricsExporter()
}
@ -230,6 +236,10 @@ func getCurMetricsConfig() *metricsConfig {
func setCurMetricsConfig(c *metricsConfig) {
metricsMux.Lock()
defer metricsMux.Unlock()
setCurMetricsConfigUnlocked(c)
}
func setCurMetricsConfigUnlocked(c *metricsConfig) {
if c != nil {
view.SetReportingPeriod(c.reportingPeriod)
} else {
@ -244,6 +254,10 @@ func setCurMetricsConfig(c *metricsConfig) {
// Return value indicates whether the exporter is flushable or not.
func FlushExporter() bool {
e := getCurMetricsExporter()
return flushGivenExporter(e)
}
func flushGivenExporter(e view.Exporter) bool {
if e == nil {
return false
}

View File

@ -24,6 +24,7 @@ import (
"go.opencensus.io/stats/view"
"go.uber.org/zap"
"google.golang.org/grpc/credentials"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
)
@ -33,49 +34,53 @@ func newOpenCensusExporter(config *metricsConfig, logger *zap.SugaredLogger) (vi
opts = append(opts, ocagent.WithAddress(config.collectorAddress))
}
if config.requireSecure {
opts = append(opts, ocagent.WithTLSCredentials(credentialFetcher(config.component, config.secretFetcher, logger)))
opts = append(opts, ocagent.WithTLSCredentials(getCredentials(config.component, config.secret, logger)))
} else {
opts = append(opts, ocagent.WithInsecure())
}
e, err := ocagent.NewExporter(opts...)
if err != nil {
logger.Errorw("Failed to create the OpenCensus exporter.", zap.Error(err))
logger.Errorw("failed to create the OpenCensus exporter.", zap.Error(err))
return nil, err
}
logger.Infof("Created OpenCensus exporter with config: %+v.", *config)
logger.Infof("created OpenCensus exporter with config: %+v.", *config)
view.RegisterExporter(e)
return e, nil
}
// credentialFetcher attempts to locate a secret containing TLS credentials
// getOpenCensusSecret attempts to locate a secret containing TLS credentials
// for communicating with the OpenCensus Agent. To do this, it first looks
// for a secret named "<component>-opencensus", then for a generic
// "opencensus" secret.
func credentialFetcher(component string, lister SecretFetcher, logger *zap.SugaredLogger) credentials.TransportCredentials {
func getOpenCensusSecret(component string, lister SecretFetcher) (*corev1.Secret, error) {
if lister == nil {
logger.Errorf("No secret lister provided for component %q; cannot use requireSecure=true", component)
return nil, fmt.Errorf("no secret lister provided for component %q; cannot use requireSecure=true", component)
}
secret, err := lister(component + "-opencensus")
if errors.IsNotFound(err) {
secret, err = lister("opencensus")
}
if err != nil {
return nil, fmt.Errorf("unable to fetch opencensus secret for %q, cannot use requireSecure=true: %+v", component, err)
}
return secret, nil
}
// getCredentials attempts to create a certificate containing TLS credentials
// for communicating with the OpenCensus Agent.
func getCredentials(component string, secret *corev1.Secret, logger *zap.SugaredLogger) credentials.TransportCredentials {
if secret == nil {
logger.Errorf("no secret provided for component %q; cannot use requireSecure=true", component)
return nil
}
return credentials.NewTLS(&tls.Config{
GetClientCertificate: func(*tls.CertificateRequestInfo) (*tls.Certificate, error) {
// We ignore the CertificateRequestInfo for now, and hand back a single fixed certificate.
// TODO(evankanderson): maybe do something SPIFFE-ier?
cert, err := certificateFetcher(component+"-opencensus", lister)
if errors.IsNotFound(err) {
cert, err = certificateFetcher("opencensus", lister)
}
cert, err := tls.X509KeyPair(secret.Data["client-cert.pem"], secret.Data["client-key.pem"])
if err != nil {
return nil, fmt.Errorf("Unable to fetch opencensus secret for %q, cannot use requireSecure=true: %+v", component, err)
return nil, err
}
return &cert, err
return &cert, nil
},
})
}
func certificateFetcher(secretName string, lister SecretFetcher) (tls.Certificate, error) {
secret, err := lister(secretName)
if err != nil {
return tls.Certificate{}, err
}
return tls.X509KeyPair(secret.Data["client-cert.pem"], secret.Data["client-key.pem"])
}

View File

@ -65,7 +65,7 @@ func TestOpenCensusConfig(t *testing.T) {
domain: "secure",
component: "test",
backendDestination: OpenCensus,
secretFetcher: fakeSecretList(corev1.Secret{
secret: &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: "test-opencensus",
},
@ -73,7 +73,7 @@ func TestOpenCensusConfig(t *testing.T) {
"client-cert.pem": cert,
"client-key.pem": key,
},
}).Get,
},
requireSecure: true,
},
tls: &tls.Config{},
@ -140,6 +140,10 @@ func fakeSecretList(s ...corev1.Secret) *fakeSecrets {
func (f *fakeSecrets) Get(name string) (*corev1.Secret, error) {
for _, s := range f.secrets {
if fmt.Sprintf("%s/%s", s.Namespace, s.Name) == name {
return &s, nil
}
if s.Name == name {
return &s, nil
}

View File

@ -115,7 +115,7 @@ func newOpencensusSDExporter(o stackdriver.Options) (view.Exporter, error) {
func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (view.Exporter, error) {
gm := getMergedGCPMetadata(config)
mpf := getMetricPrefixFunc(config.stackdriverMetricTypePrefix, config.stackdriverCustomMetricTypePrefix)
co, err := getStackdriverExporterClientOptions(&config.stackdriverClientConfig)
co, err := getStackdriverExporterClientOptions(config)
if err != nil {
logger.Warnw("Issue configuring Stackdriver exporter client options, no additional client options will be used: ", zap.Error(err))
}
@ -140,21 +140,21 @@ func newStackdriverExporter(config *metricsConfig, logger *zap.SugaredLogger) (v
// getStackdriverExporterClientOptions creates client options for the opencensus Stackdriver exporter from the given stackdriverClientConfig.
// On error, an empty array of client options is returned.
func getStackdriverExporterClientOptions(sdconfig *StackdriverClientConfig) ([]option.ClientOption, error) {
func getStackdriverExporterClientOptions(config *metricsConfig) ([]option.ClientOption, error) {
var co []option.ClientOption
if sdconfig.UseSecret && useStackdriverSecretEnabled {
secret, err := getStackdriverSecret(sdconfig)
if err != nil {
return co, err
// SetStackdriverSecretLocation must have been called by calling package for this to work.
if config.stackdriverClientConfig.UseSecret {
if config.secret == nil {
return co, fmt.Errorf("No secret provided for component %q; cannot use stackdriver-use-secret=true", config.component)
}
if opt, err := convertSecretToExporterOption(secret); err == nil {
if opt, err := convertSecretToExporterOption(config.secret); err == nil {
co = append(co, opt)
} else {
return co, err
}
}
return co, nil
}
@ -215,19 +215,31 @@ func getMetricPrefixFunc(metricTypePrefix, customMetricTypePrefix string) func(n
}
// getStackdriverSecret returns the Kubernetes Secret specified in the given config.
// SetStackdriverSecretLocation must have been called by calling package for this to work.
// TODO(anniefu): Update exporter if Secret changes (https://github.com/knative/pkg/issues/842)
func getStackdriverSecret(sdconfig *StackdriverClientConfig) (*corev1.Secret, error) {
if err := ensureKubeclient(); err != nil {
return nil, err
}
func getStackdriverSecret(secretFetcher SecretFetcher) (*corev1.Secret, error) {
stackdriverMtx.RLock()
defer stackdriverMtx.RUnlock()
sec, secErr := kubeclient.CoreV1().Secrets(secretNamespace).Get(secretName, metav1.GetOptions{})
if !useStackdriverSecretEnabled {
return nil, nil
}
var secErr error
var sec *corev1.Secret
if secretFetcher != nil {
sec, secErr = secretFetcher(fmt.Sprintf("%s/%s", secretNamespace, secretName))
} else {
// This else-block can be removed once UpdateExporterFromConfigMap is fully deprecated in favor of ConfigMapWatcher
if err := ensureKubeclient(); err != nil {
return nil, err
}
sec, secErr = kubeclient.CoreV1().Secrets(secretNamespace).Get(secretName, metav1.GetOptions{})
}
if secErr != nil {
return nil, fmt.Errorf("Error getting Secret [%s] in namespace [%s]: %w", secretName, secretNamespace, secErr)
return nil, fmt.Errorf("error getting Secret [%v] in namespace [%v]: %v", secretName, secretNamespace, secErr)
}
return sec, nil

View File

@ -24,6 +24,8 @@ import (
"contrib.go.opencensus.io/exporter/stackdriver"
"go.opencensus.io/metric/metricdata"
"go.opencensus.io/stats/view"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
. "knative.dev/pkg/logging/testing"
"knative.dev/pkg/metrics/metricskey"
)
@ -471,48 +473,45 @@ func assertStringsEqual(t *testing.T, description string, expected string, actua
}
func TestSetStackdriverSecretLocation(t *testing.T) {
// Prevent pollution from other tests
useStackdriverSecretEnabled = false
// Reset global state after test
defer func() {
secretName = StackdriverSecretNameDefault
secretNamespace = StackdriverSecretNamespaceDefault
useStackdriverSecretEnabled = false
}()
sdConfig := &StackdriverClientConfig{
ProjectID: "project",
GCPLocation: "us-west2",
ClusterName: "cluster",
UseSecret: false,
const testName, testNamespace = "test-name", "test-namespace"
secretFetcher := func(name string) (*corev1.Secret, error) {
return &corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: testName,
Namespace: testNamespace,
},
}, nil
}
// Sanity checks
assertStringsEqual(t, "DefaultSecretName", secretName, StackdriverSecretNameDefault)
assertStringsEqual(t, "DefaultSecretNamespace", secretNamespace, StackdriverSecretNamespaceDefault)
if _, err := getStackdriverExporterClientOptions(sdConfig); err != nil {
t.Errorf("Got unexpected error when creating exporter client options: [%v]", err)
sec, err := getStackdriverSecret(secretFetcher)
if err != nil {
t.Errorf("Got unexpected error when getting secret: %v", err)
}
if sec != nil {
t.Errorf("Stackdriver secret should not be fetched unless SetStackdriverSecretLocation has been called")
}
// Check configuration's UseSecret value is ignored until the consuming package calls SetStackdriverSecretLocation
// If an attempt to read a Secret was made, there should be an error because there's no valid in-cluster kubeclient.
sdConfig.UseSecret = true
if _, e1 := getStackdriverExporterClientOptions(sdConfig); e1 != nil {
t.Errorf("Got unexpected error when creating exporter client options: [%v]", e1)
// Once SetStackdriverSecretLocation has been called, attempts to get the secret should complete.
SetStackdriverSecretLocation(testName, testNamespace)
sec, err = getStackdriverSecret(secretFetcher)
if err != nil {
t.Errorf("Got unexpected error when getting secret: %v", err)
}
testName, testNamespace := "test-name", "test-namespace"
// SetStackdriverSecretLocation has been called & config's UseSecret value is set
// There should be an attempt to read the Secret, and an error because there's no valid in-cluster kubeclient.
SetStackdriverSecretLocation("test-name", "test-namespace")
if _, e1 := getStackdriverExporterClientOptions(sdConfig); e1 == nil {
t.Errorf("Expected a known error when getting exporter options with Secrets enabled (cannot create valid kubeclient in tests). Did the function run as expected?")
if sec == nil {
t.Error("expected secret to be non-nil if there is no error and SetStackdriverSecretLocation has been called")
}
assertStringsEqual(t, "secretName", secretName, testName)
assertStringsEqual(t, "secretNamespace", secretNamespace, testNamespace)
randomName, randomNamespace := "random-name", "random-namespace"
SetStackdriverSecretLocation(randomName, randomNamespace)
if _, e1 := getStackdriverExporterClientOptions(sdConfig); e1 == nil {
t.Errorf("Expected a known error when getting exporter options with Secrets enabled (cannot create valid kubeclient in tests). Did the function run as expected?")
}
assertStringsEqual(t, "secretName", secretName, randomName)
assertStringsEqual(t, "secretNamespace", secretNamespace, randomNamespace)
}