kubelet tracing
Signed-off-by: Sally O'Malley <somalley@redhat.com> Co-authored-by: David Ashpole <dashpole@google.com> Kubernetes-commit: 47e7d8034ff3be8e198dde6a671d05a11c30e333
This commit is contained in:
parent
cbc2a9e12f
commit
4f9e133507
|
|
@ -22,19 +22,17 @@ import (
|
|||
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
|
||||
"k8s.io/component-base/traces"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
)
|
||||
|
||||
// WithTracing adds tracing to requests if the incoming request is sampled
|
||||
func WithTracing(handler http.Handler, tp *trace.TracerProvider) http.Handler {
|
||||
func WithTracing(handler http.Handler, tp trace.TracerProvider) http.Handler {
|
||||
opts := []otelhttp.Option{
|
||||
otelhttp.WithPropagators(traces.Propagators()),
|
||||
otelhttp.WithPropagators(tracing.Propagators()),
|
||||
otelhttp.WithPublicEndpoint(),
|
||||
otelhttp.WithTracerProvider(tp),
|
||||
}
|
||||
if tp != nil {
|
||||
opts = append(opts, otelhttp.WithTracerProvider(*tp))
|
||||
}
|
||||
// Even if there is no TracerProvider, the otelhttp still handles context propagation.
|
||||
// With Noop TracerProvider, the otelhttp still handles context propagation.
|
||||
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
|
||||
return otelhttp.NewHandler(handler, "KubernetesAPI", opts...)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ import (
|
|||
|
||||
jsonpatch "github.com/evanphx/json-patch"
|
||||
"github.com/google/uuid"
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
|
|
@ -139,7 +139,7 @@ type Config struct {
|
|||
ExternalAddress string
|
||||
|
||||
// TracerProvider can provide a tracer, which records spans for distributed tracing.
|
||||
TracerProvider *trace.TracerProvider
|
||||
TracerProvider oteltrace.TracerProvider
|
||||
|
||||
//===========================================================================
|
||||
// Fields you probably don't care about changing
|
||||
|
|
@ -375,6 +375,7 @@ func NewConfig(codecs serializer.CodecFactory) *Config {
|
|||
|
||||
APIServerID: id,
|
||||
StorageVersionManager: storageversion.NewDefaultManager(),
|
||||
TracerProvider: oteltrace.NewNoopTracerProvider(),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ import (
|
|||
clientgoclientset "k8s.io/client-go/kubernetes"
|
||||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
"k8s.io/component-base/traces"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
)
|
||||
|
||||
// CoreAPIOptions contains options to configure the connection to a core API Kubernetes apiserver.
|
||||
|
|
@ -73,7 +73,7 @@ func (o *CoreAPIOptions) ApplyTo(config *server.RecommendedConfig) error {
|
|||
}
|
||||
}
|
||||
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
|
||||
kubeconfig.Wrap(traces.WrapperFor(config.TracerProvider))
|
||||
kubeconfig.Wrap(tracing.WrapperFor(config.TracerProvider))
|
||||
}
|
||||
clientgoExternalClient, err := clientgoclientset.NewForConfig(kubeconfig)
|
||||
if err != nil {
|
||||
|
|
|
|||
|
|
@ -107,10 +107,8 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
|
|||
if err := o.EgressSelector.ApplyTo(&config.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
|
||||
if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := o.SecureServing.ApplyTo(&config.Config.SecureServing, &config.Config.LoopbackClientConfig); err != nil {
|
||||
return err
|
||||
|
|
|
|||
|
|
@ -19,24 +19,39 @@ package options
|
|||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
|
||||
"github.com/spf13/pflag"
|
||||
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
|
||||
"go.opentelemetry.io/otel/sdk/resource"
|
||||
sdktrace "go.opentelemetry.io/otel/sdk/trace"
|
||||
"go.opentelemetry.io/otel/semconv"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver/install"
|
||||
"k8s.io/apiserver/pkg/features"
|
||||
"k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
"k8s.io/apiserver/pkg/tracing"
|
||||
"k8s.io/component-base/traces"
|
||||
"k8s.io/apiserver/pkg/util/feature"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
tracingapi "k8s.io/component-base/tracing/api/v1"
|
||||
"k8s.io/utils/path"
|
||||
)
|
||||
|
||||
const apiserverService = "apiserver"
|
||||
|
||||
var (
|
||||
cfgScheme = runtime.NewScheme()
|
||||
codecs = serializer.NewCodecFactory(cfgScheme)
|
||||
)
|
||||
|
||||
func init() {
|
||||
install.Install(cfgScheme)
|
||||
}
|
||||
|
||||
// TracingOptions contain configuration options for tracing
|
||||
// exporters
|
||||
type TracingOptions struct {
|
||||
|
|
@ -64,21 +79,21 @@ func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Co
|
|||
if o == nil || o.ConfigFile == "" {
|
||||
return nil
|
||||
}
|
||||
if !feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
|
||||
return fmt.Errorf("APIServerTracing feature is not enabled, but tracing config file was provided")
|
||||
}
|
||||
|
||||
npConfig, err := tracing.ReadTracingConfiguration(o.ConfigFile)
|
||||
traceConfig, err := ReadTracingConfiguration(o.ConfigFile)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to read tracing config: %v", err)
|
||||
}
|
||||
|
||||
errs := tracing.ValidateTracingConfiguration(npConfig)
|
||||
errs := tracingapi.ValidateTracingConfiguration(traceConfig, feature.DefaultFeatureGate, nil)
|
||||
if len(errs) > 0 {
|
||||
return fmt.Errorf("failed to validate tracing configuration: %v", errs.ToAggregate())
|
||||
}
|
||||
|
||||
opts := []otlpgrpc.Option{}
|
||||
if npConfig.Endpoint != nil {
|
||||
opts = append(opts, otlpgrpc.WithEndpoint(*npConfig.Endpoint))
|
||||
}
|
||||
if es != nil {
|
||||
// Only use the egressselector dialer if egressselector is enabled.
|
||||
// Endpoint is on the "ControlPlane" network
|
||||
|
|
@ -93,21 +108,19 @@ func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Co
|
|||
opts = append(opts, otlpgrpc.WithDialOption(grpc.WithContextDialer(otelDialer)))
|
||||
}
|
||||
|
||||
sampler := sdktrace.NeverSample()
|
||||
if npConfig.SamplingRatePerMillion != nil && *npConfig.SamplingRatePerMillion > 0 {
|
||||
sampler = sdktrace.TraceIDRatioBased(float64(*npConfig.SamplingRatePerMillion) / float64(1000000))
|
||||
}
|
||||
|
||||
resourceOpts := []resource.Option{
|
||||
resource.WithAttributes(
|
||||
semconv.ServiceNameKey.String(apiserverService),
|
||||
semconv.ServiceInstanceIDKey.String(c.APIServerID),
|
||||
),
|
||||
}
|
||||
tp := traces.NewProvider(context.Background(), sampler, resourceOpts, opts...)
|
||||
c.TracerProvider = &tp
|
||||
tp, err := tracing.NewProvider(context.Background(), traceConfig, opts, resourceOpts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.TracerProvider = tp
|
||||
if c.LoopbackClientConfig != nil {
|
||||
c.LoopbackClientConfig.Wrap(traces.WrapperFor(c.TracerProvider))
|
||||
c.LoopbackClientConfig.Wrap(tracing.WrapperFor(c.TracerProvider))
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
@ -125,3 +138,24 @@ func (o *TracingOptions) Validate() (errs []error) {
|
|||
}
|
||||
return
|
||||
}
|
||||
|
||||
// ReadTracingConfiguration reads the tracing configuration from a file
|
||||
func ReadTracingConfiguration(configFilePath string) (*tracingapi.TracingConfiguration, error) {
|
||||
if configFilePath == "" {
|
||||
return nil, fmt.Errorf("tracing config file was empty")
|
||||
}
|
||||
data, err := ioutil.ReadFile(configFilePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err)
|
||||
}
|
||||
internalConfig := &apiserver.TracingConfiguration{}
|
||||
// this handles json/yaml/whatever, and decodes all registered version to the internal version
|
||||
if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil {
|
||||
return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err)
|
||||
}
|
||||
tc := &tracingapi.TracingConfiguration{
|
||||
Endpoint: internalConfig.Endpoint,
|
||||
SamplingRatePerMillion: internalConfig.SamplingRatePerMillion,
|
||||
}
|
||||
return tc, nil
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,7 +16,26 @@ limitations under the License.
|
|||
|
||||
package options
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
tracingapi "k8s.io/component-base/tracing/api/v1"
|
||||
)
|
||||
|
||||
var (
|
||||
localhost = "localhost:4317"
|
||||
ipAddress = "127.0.0.1:4317"
|
||||
samplingRate = int32(12345)
|
||||
)
|
||||
|
||||
func strptr(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func TestValidateTracingOptions(t *testing.T) {
|
||||
testcases := []struct {
|
||||
|
|
@ -56,3 +75,110 @@ func TestValidateTracingOptions(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadTracingConfiguration(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
contents string
|
||||
createFile bool
|
||||
expectedResult *tracingapi.TracingConfiguration
|
||||
expectedError *string
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
createFile: true,
|
||||
contents: ``,
|
||||
expectedResult: &tracingapi.TracingConfiguration{},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "absent",
|
||||
createFile: false,
|
||||
contents: ``,
|
||||
expectedResult: nil,
|
||||
expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"),
|
||||
},
|
||||
{
|
||||
name: "v1alpha1",
|
||||
createFile: true,
|
||||
contents: `
|
||||
apiVersion: apiserver.config.k8s.io/v1alpha1
|
||||
kind: TracingConfiguration
|
||||
endpoint: localhost:4317
|
||||
samplingRatePerMillion: 12345
|
||||
`,
|
||||
expectedResult: &tracingapi.TracingConfiguration{
|
||||
Endpoint: &localhost,
|
||||
SamplingRatePerMillion: &samplingRate,
|
||||
},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "ip address",
|
||||
createFile: true,
|
||||
contents: `
|
||||
apiVersion: apiserver.config.k8s.io/v1alpha1
|
||||
kind: TracingConfiguration
|
||||
endpoint: 127.0.0.1:4317
|
||||
`,
|
||||
expectedResult: &tracingapi.TracingConfiguration{
|
||||
Endpoint: &ipAddress,
|
||||
},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "wrong_type",
|
||||
createFile: true,
|
||||
contents: `
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: agent
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
k8s-app: agent
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
k8s-app: agent
|
||||
spec:
|
||||
containers:
|
||||
- image: k8s.gcr.io/busybox
|
||||
name: agent
|
||||
`,
|
||||
expectedResult: nil,
|
||||
expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name)
|
||||
if tc.createFile {
|
||||
f, err := ioutil.TempFile("", proxyConfig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.Remove(f.Name())
|
||||
if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
proxyConfig = f.Name()
|
||||
}
|
||||
config, err := ReadTracingConfiguration(proxyConfig)
|
||||
if err == nil && tc.expectedError != nil {
|
||||
t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError)
|
||||
}
|
||||
if err != nil && tc.expectedError == nil {
|
||||
t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err)
|
||||
}
|
||||
if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) {
|
||||
t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err)
|
||||
}
|
||||
if !reflect.DeepEqual(config, tc.expectedResult) {
|
||||
t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,7 +19,8 @@ package storagebackend
|
|||
import (
|
||||
"time"
|
||||
|
||||
"go.opentelemetry.io/otel/trace"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apiserver/pkg/server/egressselector"
|
||||
|
|
@ -50,7 +51,7 @@ type TransportConfig struct {
|
|||
// function to determine the egress dialer. (i.e. konnectivity server dialer)
|
||||
EgressLookup egressselector.Lookup
|
||||
// The TracerProvider can add tracing the connection
|
||||
TracerProvider *trace.TracerProvider
|
||||
TracerProvider oteltrace.TracerProvider
|
||||
}
|
||||
|
||||
// Config is configuration for creating a storage backend.
|
||||
|
|
@ -122,5 +123,6 @@ func NewDefaultConfig(prefix string, codec runtime.Codec) *Config {
|
|||
HealthcheckTimeout: DefaultHealthcheckTimeout,
|
||||
ReadycheckTimeout: DefaultReadinessTimeout,
|
||||
LeaseManagerConfig: etcd3.NewDefaultLeaseManagerConfig(),
|
||||
Transport: TransportConfig{TracerProvider: oteltrace.NewNoopTracerProvider()},
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,7 @@ import (
|
|||
"k8s.io/apiserver/pkg/storage/value"
|
||||
utilfeature "k8s.io/apiserver/pkg/util/feature"
|
||||
"k8s.io/component-base/metrics/legacyregistry"
|
||||
"k8s.io/component-base/traces"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
"k8s.io/klog/v2"
|
||||
)
|
||||
|
||||
|
|
@ -226,12 +226,10 @@ var newETCD3Client = func(c storagebackend.TransportConfig) (*clientv3.Client, e
|
|||
}
|
||||
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
|
||||
tracingOpts := []otelgrpc.Option{
|
||||
otelgrpc.WithPropagators(traces.Propagators()),
|
||||
otelgrpc.WithPropagators(tracing.Propagators()),
|
||||
otelgrpc.WithTracerProvider(c.TracerProvider),
|
||||
}
|
||||
if c.TracerProvider != nil {
|
||||
tracingOpts = append(tracingOpts, otelgrpc.WithTracerProvider(*c.TracerProvider))
|
||||
}
|
||||
// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
|
||||
// Even with Noop TracerProvider, the otelgrpc still handles context propagation.
|
||||
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
|
||||
dialOptions = append(dialOptions,
|
||||
grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
|
||||
|
|
|
|||
|
|
@ -25,6 +25,7 @@ import (
|
|||
"testing"
|
||||
|
||||
"go.etcd.io/etcd/client/pkg/v3/transport"
|
||||
oteltrace "go.opentelemetry.io/otel/trace"
|
||||
|
||||
apitesting "k8s.io/apimachinery/pkg/api/apitesting"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
|
|
@ -72,10 +73,11 @@ func TestTLSConnection(t *testing.T) {
|
|||
cfg := storagebackend.Config{
|
||||
Type: storagebackend.StorageTypeETCD3,
|
||||
Transport: storagebackend.TransportConfig{
|
||||
ServerList: client.Endpoints(),
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
ServerList: client.Endpoints(),
|
||||
CertFile: certFile,
|
||||
KeyFile: keyFile,
|
||||
TrustedCAFile: caFile,
|
||||
TracerProvider: oteltrace.NewNoopTracerProvider(),
|
||||
},
|
||||
Codec: codec,
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,119 +0,0 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver/install"
|
||||
)
|
||||
|
||||
const (
|
||||
maxSamplingRatePerMillion = 1000000
|
||||
)
|
||||
|
||||
var (
|
||||
cfgScheme = runtime.NewScheme()
|
||||
codecs = serializer.NewCodecFactory(cfgScheme)
|
||||
)
|
||||
|
||||
func init() {
|
||||
install.Install(cfgScheme)
|
||||
}
|
||||
|
||||
// ReadTracingConfiguration reads the tracing configuration from a file
|
||||
func ReadTracingConfiguration(configFilePath string) (*apiserver.TracingConfiguration, error) {
|
||||
if configFilePath == "" {
|
||||
return nil, fmt.Errorf("tracing config file was empty")
|
||||
}
|
||||
data, err := ioutil.ReadFile(configFilePath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err)
|
||||
}
|
||||
internalConfig := &apiserver.TracingConfiguration{}
|
||||
// this handles json/yaml/whatever, and decodes all registered version to the internal version
|
||||
if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil {
|
||||
return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err)
|
||||
}
|
||||
return internalConfig, nil
|
||||
}
|
||||
|
||||
// ValidateTracingConfiguration validates the tracing configuration
|
||||
func ValidateTracingConfiguration(config *apiserver.TracingConfiguration) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
if config == nil {
|
||||
// Tracing is disabled
|
||||
return allErrs
|
||||
}
|
||||
if config.SamplingRatePerMillion != nil {
|
||||
allErrs = append(allErrs, validateSamplingRate(*config.SamplingRatePerMillion, field.NewPath("samplingRatePerMillion"))...)
|
||||
}
|
||||
if config.Endpoint != nil {
|
||||
allErrs = append(allErrs, validateEndpoint(*config.Endpoint, field.NewPath("endpoint"))...)
|
||||
}
|
||||
return allErrs
|
||||
}
|
||||
|
||||
func validateSamplingRate(rate int32, fldPath *field.Path) field.ErrorList {
|
||||
errs := field.ErrorList{}
|
||||
if rate < 0 {
|
||||
errs = append(errs, field.Invalid(
|
||||
fldPath, rate,
|
||||
"sampling rate must be positive",
|
||||
))
|
||||
}
|
||||
if rate > maxSamplingRatePerMillion {
|
||||
errs = append(errs, field.Invalid(
|
||||
fldPath, rate,
|
||||
"sampling rate per million must be less than or equal to one million",
|
||||
))
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
||||
func validateEndpoint(endpoint string, fldPath *field.Path) field.ErrorList {
|
||||
errs := field.ErrorList{}
|
||||
if !strings.Contains(endpoint, "//") {
|
||||
endpoint = "dns://" + endpoint
|
||||
}
|
||||
url, err := url.Parse(endpoint)
|
||||
if err != nil {
|
||||
errs = append(errs, field.Invalid(
|
||||
fldPath, endpoint,
|
||||
err.Error(),
|
||||
))
|
||||
return errs
|
||||
}
|
||||
switch url.Scheme {
|
||||
case "dns":
|
||||
case "unix":
|
||||
case "unix-abstract":
|
||||
default:
|
||||
errs = append(errs, field.Invalid(
|
||||
fldPath, endpoint,
|
||||
fmt.Sprintf("unsupported scheme: %v. Options are none, dns, unix, or unix-abstract. See https://github.com/grpc/grpc/blob/master/doc/naming.md", url.Scheme),
|
||||
))
|
||||
}
|
||||
return errs
|
||||
}
|
||||
|
|
@ -1,270 +0,0 @@
|
|||
/*
|
||||
Copyright 2021 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package tracing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apiserver/pkg/apis/apiserver"
|
||||
)
|
||||
|
||||
var (
|
||||
localhost = "localhost:4317"
|
||||
ipAddress = "127.0.0.1:4317"
|
||||
samplingRate = int32(12345)
|
||||
)
|
||||
|
||||
func strptr(s string) *string {
|
||||
return &s
|
||||
}
|
||||
|
||||
func TestReadTracingConfiguration(t *testing.T) {
|
||||
testcases := []struct {
|
||||
name string
|
||||
contents string
|
||||
createFile bool
|
||||
expectedResult *apiserver.TracingConfiguration
|
||||
expectedError *string
|
||||
}{
|
||||
{
|
||||
name: "empty",
|
||||
createFile: true,
|
||||
contents: ``,
|
||||
expectedResult: &apiserver.TracingConfiguration{},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "absent",
|
||||
createFile: false,
|
||||
contents: ``,
|
||||
expectedResult: nil,
|
||||
expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"),
|
||||
},
|
||||
{
|
||||
name: "v1alpha1",
|
||||
createFile: true,
|
||||
contents: `
|
||||
apiVersion: apiserver.config.k8s.io/v1alpha1
|
||||
kind: TracingConfiguration
|
||||
endpoint: localhost:4317
|
||||
samplingRatePerMillion: 12345
|
||||
`,
|
||||
expectedResult: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &localhost,
|
||||
SamplingRatePerMillion: &samplingRate,
|
||||
},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "ip address",
|
||||
createFile: true,
|
||||
contents: `
|
||||
apiVersion: apiserver.config.k8s.io/v1alpha1
|
||||
kind: TracingConfiguration
|
||||
endpoint: 127.0.0.1:4317
|
||||
`,
|
||||
expectedResult: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &ipAddress,
|
||||
},
|
||||
expectedError: nil,
|
||||
},
|
||||
{
|
||||
name: "wrong_type",
|
||||
createFile: true,
|
||||
contents: `
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: agent
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
k8s-app: agent
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
k8s-app: agent
|
||||
spec:
|
||||
containers:
|
||||
- image: registry.k8s.io/busybox
|
||||
name: agent
|
||||
`,
|
||||
expectedResult: nil,
|
||||
expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name)
|
||||
if tc.createFile {
|
||||
f, err := ioutil.TempFile("", proxyConfig)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer os.Remove(f.Name())
|
||||
if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
proxyConfig = f.Name()
|
||||
}
|
||||
config, err := ReadTracingConfiguration(proxyConfig)
|
||||
if err == nil && tc.expectedError != nil {
|
||||
t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError)
|
||||
}
|
||||
if err != nil && tc.expectedError == nil {
|
||||
t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err)
|
||||
}
|
||||
if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) {
|
||||
t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err)
|
||||
}
|
||||
if !reflect.DeepEqual(config, tc.expectedResult) {
|
||||
t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestValidateTracingConfiguration(t *testing.T) {
|
||||
samplingRate := int32(12378)
|
||||
negativeRate := int32(-1)
|
||||
tooHighRate := int32(1000001)
|
||||
validEndpoint := "localhost:4317"
|
||||
dnsEndpoint := "dns://google.com:4317"
|
||||
unixEndpoint := "unix://path/to/socket"
|
||||
invalidURL := "dn%2s://localhost:4317"
|
||||
httpEndpoint := "http://localhost:4317"
|
||||
testcases := []struct {
|
||||
name string
|
||||
expectError bool
|
||||
contents *apiserver.TracingConfiguration
|
||||
}{
|
||||
{
|
||||
name: "sampling-rate-valid",
|
||||
expectError: false,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
SamplingRatePerMillion: &samplingRate,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sampling-rate-negative",
|
||||
expectError: true,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
SamplingRatePerMillion: &negativeRate,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "sampling-rate-negative",
|
||||
expectError: true,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
SamplingRatePerMillion: &tooHighRate,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "default Endpoint",
|
||||
expectError: false,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &validEndpoint,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "dns Endpoint",
|
||||
expectError: false,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &dnsEndpoint,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "unix Endpoint",
|
||||
expectError: false,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &unixEndpoint,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid Endpoint",
|
||||
expectError: true,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &httpEndpoint,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "invalid url",
|
||||
expectError: true,
|
||||
contents: &apiserver.TracingConfiguration{
|
||||
TypeMeta: metav1.TypeMeta{
|
||||
Kind: "",
|
||||
APIVersion: "",
|
||||
},
|
||||
Endpoint: &invalidURL,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testcases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
errs := ValidateTracingConfiguration(tc.contents)
|
||||
if tc.expectError == false && len(errs) != 0 {
|
||||
t.Errorf("Calling ValidateTracingConfiguration expected no error, got %v", errs)
|
||||
} else if tc.expectError == true && len(errs) == 0 {
|
||||
t.Errorf("Calling ValidateTracingConfiguration expected error, got no error")
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
@ -35,7 +35,7 @@ import (
|
|||
"k8s.io/client-go/rest"
|
||||
"k8s.io/client-go/tools/clientcmd"
|
||||
clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
|
||||
"k8s.io/component-base/traces"
|
||||
tracing "k8s.io/component-base/tracing"
|
||||
)
|
||||
|
||||
// AuthenticationInfoResolverWrapper can be used to inject Dial function to the
|
||||
|
|
@ -47,7 +47,7 @@ func NewDefaultAuthenticationInfoResolverWrapper(
|
|||
proxyTransport *http.Transport,
|
||||
egressSelector *egressselector.EgressSelector,
|
||||
kubeapiserverClientConfig *rest.Config,
|
||||
tp *trace.TracerProvider) AuthenticationInfoResolverWrapper {
|
||||
tp trace.TracerProvider) AuthenticationInfoResolverWrapper {
|
||||
|
||||
webhookAuthResolverWrapper := func(delegate AuthenticationInfoResolver) AuthenticationInfoResolver {
|
||||
return &AuthenticationInfoResolverDelegator{
|
||||
|
|
@ -60,7 +60,7 @@ func NewDefaultAuthenticationInfoResolverWrapper(
|
|||
return nil, err
|
||||
}
|
||||
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
|
||||
ret.Wrap(traces.WrapperFor(tp))
|
||||
ret.Wrap(tracing.WrapperFor(tp))
|
||||
}
|
||||
|
||||
if egressSelector != nil {
|
||||
|
|
@ -85,7 +85,7 @@ func NewDefaultAuthenticationInfoResolverWrapper(
|
|||
return nil, err
|
||||
}
|
||||
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
|
||||
ret.Wrap(traces.WrapperFor(tp))
|
||||
ret.Wrap(tracing.WrapperFor(tp))
|
||||
}
|
||||
|
||||
if egressSelector != nil {
|
||||
|
|
|
|||
Loading…
Reference in New Issue