Add distributed tracing to the apiserver using OpenTelemetry

Kubernetes-commit: 79550ed40c67a70534c1cb697e1fb7e7dbf96335
This commit is contained in:
David Ashpole 2021-06-25 05:20:16 -07:00 committed by Kubernetes Publisher
parent b038e932b2
commit 5927da0040
18 changed files with 785 additions and 16 deletions

22
go.mod
View File

@ -29,6 +29,11 @@ require (
go.etcd.io/etcd/client/pkg/v3 v3.5.0
go.etcd.io/etcd/client/v3 v3.5.0
go.etcd.io/etcd/server/v3 v3.5.0
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0
go.opentelemetry.io/otel v0.20.0
go.opentelemetry.io/otel/exporters/otlp v0.20.0
go.opentelemetry.io/otel/sdk v0.20.0
go.opentelemetry.io/otel/trace v0.20.0
go.uber.org/zap v1.17.0
golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4
@ -37,10 +42,10 @@ require (
google.golang.org/grpc v1.38.0
gopkg.in/natefinch/lumberjack.v2 v2.0.0
gopkg.in/square/go-jose.v2 v2.2.2
k8s.io/api v0.0.0-20210625074319-07d5b5c5af44
k8s.io/apimachinery v0.0.0-20210624234137-a4b8a5f324ca
k8s.io/client-go v0.0.0-20210624234554-dae61bee3a0a
k8s.io/component-base v0.0.0-20210624195110-ded203472448
k8s.io/api v0.0.0
k8s.io/apimachinery v0.0.0
k8s.io/client-go v0.0.0
k8s.io/component-base v0.0.0
k8s.io/klog/v2 v2.9.0
k8s.io/kube-openapi v0.0.0-20210421082810-95288971da7e
k8s.io/utils v0.0.0-20210521133846-da695404a2bc
@ -50,8 +55,9 @@ require (
)
replace (
k8s.io/api => k8s.io/api v0.0.0-20210625074319-07d5b5c5af44
k8s.io/apimachinery => k8s.io/apimachinery v0.0.0-20210624234137-a4b8a5f324ca
k8s.io/client-go => k8s.io/client-go v0.0.0-20210624234554-dae61bee3a0a
k8s.io/component-base => k8s.io/component-base v0.0.0-20210624195110-ded203472448
k8s.io/api => ../api
k8s.io/apimachinery => ../apimachinery
k8s.io/apiserver => ../apiserver
k8s.io/client-go => ../client-go
k8s.io/component-base => ../component-base
)

12
go.sum
View File

@ -118,6 +118,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/evanphx/json-patch v4.11.0+incompatible h1:glyUF9yIYtMHzn8xaKw5rMhdWcwsYV8dZHIq5567/xs=
github.com/evanphx/json-patch v4.11.0+incompatible/go.mod h1:50XU6AFN0ol/bzJsmQLiYLvXMP4fmwYFNcr97nuDLSk=
github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4=
github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8SPQ=
github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible h1:7ZaBxOI7TMoYBfyA3cQHErNNyAWIKUMIwqxEtgHOs5c=
github.com/form3tech-oss/jwt-go v3.2.3+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
@ -439,6 +441,8 @@ go.opentelemetry.io/contrib v0.20.0 h1:ubFQUn0VCZ0gPwIoJfBJVpeBlyRMxu8Mm/huKWYd9
go.opentelemetry.io/contrib v0.20.0/go.mod h1:G/EtFaa6qaN7+LxqfIAT3GiZa7Wv5DTBUzl5H4LY0Kc=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0 h1:sO4WKdPAudZGKPcpZT4MJn6JaDmpyLrMPDGGyA1SttE=
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.20.0/go.mod h1:oVGt1LRbBOBq1A5BQLlUg9UaU/54aiHw8cgjV3aWZ/E=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0 h1:Q3C9yzW6I9jqEc8sawxzxZmY48fs9u220KXq6d5s3XU=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.20.0/go.mod h1:2AboqHi0CiIZU0qwhtUfCYD1GeUzvvIXWNkhDt7ZMG4=
go.opentelemetry.io/otel v0.20.0 h1:eaP0Fqu7SXHwvjiqDq83zImeehOHX8doTvU9AwXON8g=
go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo=
go.opentelemetry.io/otel/exporters/otlp v0.20.0 h1:PTNgq9MRmQqqJY0REVbZFvwkYOA85vbdQU/nVfxDyqg=
@ -773,14 +777,6 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.0.0-20210625074319-07d5b5c5af44 h1:jhN3dlIeA5dkEyCobKpA45wLdCnVxIl5o6F/2yIvl6A=
k8s.io/api v0.0.0-20210625074319-07d5b5c5af44/go.mod h1:iTIQV8axBOG5ekbRIjyIuBa1YF0kPgqd/8e1rXhfOl4=
k8s.io/apimachinery v0.0.0-20210624234137-a4b8a5f324ca h1:Rhwv9/llB72sYX254T1Be2ak+CMVvZellTv/HZxAXto=
k8s.io/apimachinery v0.0.0-20210624234137-a4b8a5f324ca/go.mod h1:KrFIbDnxb/YcngqIe3ro6X24QG9xTL5O3a3cX7+XaWg=
k8s.io/client-go v0.0.0-20210624234554-dae61bee3a0a h1:9WmyKknYdSwWBZT8zU0xB6RnqOGhyls17q8XpzGFUUM=
k8s.io/client-go v0.0.0-20210624234554-dae61bee3a0a/go.mod h1:0djEsx2KIckVFJ3wYx1TKAP9lnIwUqRWAAMkWQO2a5U=
k8s.io/component-base v0.0.0-20210624195110-ded203472448 h1:XaojkqZ2w/oWt9CUr/Pef/y9Onrvx1nporWrXiQQ/ls=
k8s.io/component-base v0.0.0-20210624195110-ded203472448/go.mod h1:1WLJGH9zBhgRf3g079FRHsnyAfZ6Y/Y/D7sopwI2yEQ=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.9.0 h1:D7HV+n1V57XeZ0m6tdRkfknthUaM06VFbWldOFh8kzM=

View File

@ -44,6 +44,7 @@ func addKnownTypes(scheme *runtime.Scheme) error {
scheme.AddKnownTypes(SchemeGroupVersion,
&AdmissionConfiguration{},
&EgressSelectorConfiguration{},
&TracingConfiguration{},
)
return nil
}

View File

@ -147,3 +147,23 @@ type TLSConfig struct {
// +optional
ClientCert string
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// TracingConfiguration provides versioned configuration for tracing clients.
type TracingConfiguration struct {
metav1.TypeMeta
// +optional
// Endpoint of the collector that's running on the control-plane node.
// The APIServer uses the egressType ControlPlane when sending data to the collector.
// The syntax is defined in https://github.com/grpc/grpc/blob/master/doc/naming.md.
// Defaults to the otlp grpc default, localhost:4317
// The connection is insecure, and does not currently support TLS.
Endpoint *string
// +optional
// SamplingRatePerMillion is the number of samples to collect per million spans.
// Defaults to 0.
SamplingRatePerMillion *int32
}

View File

@ -18,6 +18,7 @@ limitations under the License.
// +k8s:conversion-gen=k8s.io/apiserver/pkg/apis/apiserver
// +k8s:defaulter-gen=TypeMeta
// +groupName=apiserver.k8s.io
// +groupName=apiserver.config.k8s.io
// Package v1alpha1 is the v1alpha1 version of the API.
package v1alpha1 // import "k8s.io/apiserver/pkg/apis/apiserver/v1alpha1"

View File

@ -23,10 +23,14 @@ import (
)
const GroupName = "apiserver.k8s.io"
const ConfigGroupName = "apiserver.config.k8s.io"
// SchemeGroupVersion is group version used to register these objects
var SchemeGroupVersion = schema.GroupVersion{Group: GroupName, Version: "v1alpha1"}
// ConfigSchemeGroupVersion is group version used to register these objects
var ConfigSchemeGroupVersion = schema.GroupVersion{Group: ConfigGroupName, Version: "v1alpha1"}
var (
// TODO: move SchemeBuilder with zz_generated.deepcopy.go to k8s.io/api.
// localSchemeBuilder and AddToScheme will stay in k8s.io/kubernetes.
@ -48,6 +52,10 @@ func addKnownTypes(scheme *runtime.Scheme) error {
&AdmissionConfiguration{},
&EgressSelectorConfiguration{},
)
scheme.AddKnownTypes(ConfigSchemeGroupVersion,
&TracingConfiguration{},
)
metav1.AddToGroupVersion(scheme, SchemeGroupVersion)
metav1.AddToGroupVersion(scheme, ConfigSchemeGroupVersion)
return nil
}

View File

@ -147,3 +147,23 @@ type TLSConfig struct {
// +optional
ClientCert string `json:"clientCert,omitempty"`
}
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
// TracingConfiguration provides versioned configuration for tracing clients.
type TracingConfiguration struct {
metav1.TypeMeta `json:",inline"`
// +optional
// Endpoint of the collector that's running on the control-plane node.
// The APIServer uses the egressType ControlPlane when sending data to the collector.
// The syntax is defined in https://github.com/grpc/grpc/blob/master/doc/naming.md.
// Defaults to the otlpgrpc default, localhost:4317
// The connection is insecure, and does not support TLS.
Endpoint *string `json:"endpoint,omitempty" protobuf:"bytes,1,opt,name=endpoint"`
// +optional
// SamplingRatePerMillion is the number of samples to collect per million spans.
// Defaults to 0.
SamplingRatePerMillion *int32 `json:"samplingRatePerMillion,omitempty" protobuf:"varint,2,opt,name=samplingRatePerMillion"`
}

View File

@ -105,6 +105,16 @@ func RegisterConversions(s *runtime.Scheme) error {
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*TracingConfiguration)(nil), (*apiserver.TracingConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_TracingConfiguration_To_apiserver_TracingConfiguration(a.(*TracingConfiguration), b.(*apiserver.TracingConfiguration), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*apiserver.TracingConfiguration)(nil), (*TracingConfiguration)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_apiserver_TracingConfiguration_To_v1alpha1_TracingConfiguration(a.(*apiserver.TracingConfiguration), b.(*TracingConfiguration), scope)
}); err != nil {
return err
}
if err := s.AddGeneratedConversionFunc((*Transport)(nil), (*apiserver.Transport)(nil), func(a, b interface{}, scope conversion.Scope) error {
return Convert_v1alpha1_Transport_To_apiserver_Transport(a.(*Transport), b.(*apiserver.Transport), scope)
}); err != nil {
@ -286,6 +296,28 @@ func Convert_apiserver_TLSConfig_To_v1alpha1_TLSConfig(in *apiserver.TLSConfig,
return autoConvert_apiserver_TLSConfig_To_v1alpha1_TLSConfig(in, out, s)
}
func autoConvert_v1alpha1_TracingConfiguration_To_apiserver_TracingConfiguration(in *TracingConfiguration, out *apiserver.TracingConfiguration, s conversion.Scope) error {
out.Endpoint = (*string)(unsafe.Pointer(in.Endpoint))
out.SamplingRatePerMillion = (*int32)(unsafe.Pointer(in.SamplingRatePerMillion))
return nil
}
// Convert_v1alpha1_TracingConfiguration_To_apiserver_TracingConfiguration is an autogenerated conversion function.
func Convert_v1alpha1_TracingConfiguration_To_apiserver_TracingConfiguration(in *TracingConfiguration, out *apiserver.TracingConfiguration, s conversion.Scope) error {
return autoConvert_v1alpha1_TracingConfiguration_To_apiserver_TracingConfiguration(in, out, s)
}
func autoConvert_apiserver_TracingConfiguration_To_v1alpha1_TracingConfiguration(in *apiserver.TracingConfiguration, out *TracingConfiguration, s conversion.Scope) error {
out.Endpoint = (*string)(unsafe.Pointer(in.Endpoint))
out.SamplingRatePerMillion = (*int32)(unsafe.Pointer(in.SamplingRatePerMillion))
return nil
}
// Convert_apiserver_TracingConfiguration_To_v1alpha1_TracingConfiguration is an autogenerated conversion function.
func Convert_apiserver_TracingConfiguration_To_v1alpha1_TracingConfiguration(in *apiserver.TracingConfiguration, out *TracingConfiguration, s conversion.Scope) error {
return autoConvert_apiserver_TracingConfiguration_To_v1alpha1_TracingConfiguration(in, out, s)
}
func autoConvert_v1alpha1_Transport_To_apiserver_Transport(in *Transport, out *apiserver.Transport, s conversion.Scope) error {
out.TCP = (*apiserver.TCPTransport)(unsafe.Pointer(in.TCP))
out.UDS = (*apiserver.UDSTransport)(unsafe.Pointer(in.UDS))

View File

@ -184,6 +184,41 @@ func (in *TLSConfig) DeepCopy() *TLSConfig {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TracingConfiguration) DeepCopyInto(out *TracingConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.Endpoint != nil {
in, out := &in.Endpoint, &out.Endpoint
*out = new(string)
**out = **in
}
if in.SamplingRatePerMillion != nil {
in, out := &in.SamplingRatePerMillion, &out.SamplingRatePerMillion
*out = new(int32)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TracingConfiguration.
func (in *TracingConfiguration) DeepCopy() *TracingConfiguration {
if in == nil {
return nil
}
out := new(TracingConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *TracingConfiguration) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Transport) DeepCopyInto(out *Transport) {
*out = *in

View File

@ -184,6 +184,41 @@ func (in *TLSConfig) DeepCopy() *TLSConfig {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *TracingConfiguration) DeepCopyInto(out *TracingConfiguration) {
*out = *in
out.TypeMeta = in.TypeMeta
if in.Endpoint != nil {
in, out := &in.Endpoint, &out.Endpoint
*out = new(string)
**out = **in
}
if in.SamplingRatePerMillion != nil {
in, out := &in.SamplingRatePerMillion, &out.SamplingRatePerMillion
*out = new(int32)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new TracingConfiguration.
func (in *TracingConfiguration) DeepCopy() *TracingConfiguration {
if in == nil {
return nil
}
out := new(TracingConfiguration)
in.DeepCopyInto(out)
return out
}
// DeepCopyObject is an autogenerated deepcopy function, copying the receiver, creating a new runtime.Object.
func (in *TracingConfiguration) DeepCopyObject() runtime.Object {
if c := in.DeepCopy(); c != nil {
return c
}
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *Transport) DeepCopyInto(out *Transport) {
*out = *in

View File

@ -0,0 +1,40 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"net/http"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/trace"
"k8s.io/component-base/traces"
)
// WithTracing adds tracing to requests if the incoming request is sampled
func WithTracing(handler http.Handler, tp *trace.TracerProvider) http.Handler {
opts := []otelhttp.Option{
otelhttp.WithPropagators(traces.Propagators()),
otelhttp.WithPublicEndpoint(),
}
if tp != nil {
opts = append(opts, otelhttp.WithTracerProvider(*tp))
}
// Even if there is no TracerProvider, the otelhttp still handles context propagation.
// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
return otelhttp.NewHandler(handler, "KubernetesAPI", opts...)
}

View File

@ -164,6 +164,12 @@ const (
//
// Assigns each kube-apiserver an ID in a cluster.
APIServerIdentity featuregate.Feature = "APIServerIdentity"
// owner: @dashpole
// alpha: v1.22
//
// Add support for distributed tracing in the API Server
APIServerTracing featuregate.Feature = "APIServerTracing"
)
func init() {
@ -191,4 +197,5 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS
WarningHeaders: {Default: true, PreRelease: featuregate.GA, LockToDefault: true},
EfficientWatchResumption: {Default: true, PreRelease: featuregate.Beta},
APIServerIdentity: {Default: false, PreRelease: featuregate.Alpha},
APIServerTracing: {Default: false, PreRelease: featuregate.Alpha},
}

View File

@ -30,6 +30,7 @@ import (
jsonpatch "github.com/evanphx/json-patch"
"github.com/google/uuid"
"go.opentelemetry.io/otel/trace"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
@ -54,6 +55,7 @@ import (
apiopenapi "k8s.io/apiserver/pkg/endpoints/openapi"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/features"
genericfeatures "k8s.io/apiserver/pkg/features"
genericregistry "k8s.io/apiserver/pkg/registry/generic"
"k8s.io/apiserver/pkg/server/dynamiccertificates"
"k8s.io/apiserver/pkg/server/egressselector"
@ -63,6 +65,7 @@ import (
serverstore "k8s.io/apiserver/pkg/server/storage"
"k8s.io/apiserver/pkg/storageversion"
"k8s.io/apiserver/pkg/util/feature"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol"
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request"
"k8s.io/client-go/informers"
@ -138,6 +141,9 @@ type Config struct {
// Will default to a value based on secure serving info and available ipv4 IPs.
ExternalAddress string
// TracerProvider can provide a tracer, which records spans for distributed tracing.
TracerProvider *trace.TracerProvider
//===========================================================================
// Fields you probably don't care about changing
//===========================================================================
@ -773,6 +779,9 @@ func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler {
handler = genericapifilters.WithCacheControl(handler)
handler = genericfilters.WithHSTS(handler, c.HSTSDirectives)
handler = genericfilters.WithHTTPLogging(handler)
if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerTracing) {
handler = genericapifilters.WithTracing(handler, c.TracerProvider)
}
handler = genericapifilters.WithRequestInfo(handler, c.RequestInfoResolver)
handler = genericapifilters.WithRequestReceivedTimestamp(handler)
handler = genericfilters.WithPanicRecovery(handler, c.RequestInfoResolver)

View File

@ -53,6 +53,8 @@ type RecommendedOptions struct {
Admission *AdmissionOptions
// API Server Egress Selector is used to control outbound traffic from the API Server
EgressSelector *EgressSelectorOptions
// Traces contains options to control distributed request tracing.
Traces *TracingOptions
}
func NewRecommendedOptions(prefix string, codec runtime.Codec) *RecommendedOptions {
@ -79,6 +81,7 @@ func NewRecommendedOptions(prefix string, codec runtime.Codec) *RecommendedOptio
ExtraAdmissionInitializers: func(c *server.RecommendedConfig) ([]admission.PluginInitializer, error) { return nil, nil },
Admission: NewAdmissionOptions(),
EgressSelector: NewEgressSelectorOptions(),
Traces: NewTracingOptions(),
}
}
@ -92,6 +95,7 @@ func (o *RecommendedOptions) AddFlags(fs *pflag.FlagSet) {
o.CoreAPI.AddFlags(fs)
o.Admission.AddFlags(fs)
o.EgressSelector.AddFlags(fs)
o.Traces.AddFlags(fs)
}
// ApplyTo adds RecommendedOptions to the server configuration.
@ -142,6 +146,11 @@ func (o *RecommendedOptions) ApplyTo(config *server.RecommendedConfig) error {
klog.Warningf("Neither kubeconfig is provided nor service-account is mounted, so APIPriorityAndFairness will be disabled")
}
}
if feature.DefaultFeatureGate.Enabled(features.APIServerTracing) {
if err := o.Traces.ApplyTo(config.Config.EgressSelector, &config.Config); err != nil {
return err
}
}
return nil
}
@ -156,6 +165,7 @@ func (o *RecommendedOptions) Validate() []error {
errors = append(errors, o.CoreAPI.Validate()...)
errors = append(errors, o.Admission.Validate()...)
errors = append(errors, o.EgressSelector.Validate()...)
errors = append(errors, o.Traces.Validate()...)
return errors
}

View File

@ -0,0 +1,124 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import (
"context"
"fmt"
"net"
"github.com/spf13/pflag"
"go.opentelemetry.io/otel/exporters/otlp/otlpgrpc"
"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"go.opentelemetry.io/otel/semconv"
"google.golang.org/grpc"
"k8s.io/apiserver/pkg/server"
"k8s.io/apiserver/pkg/server/egressselector"
"k8s.io/apiserver/pkg/tracing"
"k8s.io/component-base/traces"
"k8s.io/utils/path"
)
const apiserverService = "kube-apiserver"
// TracingOptions contain configuration options for tracing
// exporters
type TracingOptions struct {
// ConfigFile is the file path with api-server tracing configuration.
ConfigFile string
}
// NewTracingOptions creates a new instance of TracingOptions
func NewTracingOptions() *TracingOptions {
return &TracingOptions{}
}
// AddFlags adds flags related to tracing to the specified FlagSet
func (o *TracingOptions) AddFlags(fs *pflag.FlagSet) {
if o == nil {
return
}
fs.StringVar(&o.ConfigFile, "tracing-config-file", o.ConfigFile,
"File with apiserver tracing configuration.")
}
// ApplyTo fills up Tracing config with options.
func (o *TracingOptions) ApplyTo(es *egressselector.EgressSelector, c *server.Config) error {
if o == nil || o.ConfigFile == "" {
return nil
}
npConfig, err := tracing.ReadTracingConfiguration(o.ConfigFile)
if err != nil {
return fmt.Errorf("failed to read tracing config: %v", err)
}
errs := tracing.ValidateTracingConfiguration(npConfig)
if len(errs) > 0 {
return fmt.Errorf("failed to validate tracing configuration: %v", errs.ToAggregate())
}
opts := []otlpgrpc.Option{}
if npConfig.Endpoint != nil {
opts = append(opts, otlpgrpc.WithEndpoint(*npConfig.Endpoint))
}
if es != nil {
// Only use the egressselector dialer if egressselector is enabled.
// Endpoint is on the "ControlPlane" network
egressDialer, err := es.Lookup(egressselector.ControlPlane.AsNetworkContext())
if err != nil {
return err
}
otelDialer := func(ctx context.Context, addr string) (net.Conn, error) {
return egressDialer(ctx, "tcp", addr)
}
opts = append(opts, otlpgrpc.WithDialOption(grpc.WithContextDialer(otelDialer)))
}
sampler := sdktrace.NeverSample()
if npConfig.SamplingRatePerMillion != nil && *npConfig.SamplingRatePerMillion > 0 {
sampler = sdktrace.TraceIDRatioBased(float64(*npConfig.SamplingRatePerMillion) / float64(1000000))
}
resourceOpts := []resource.Option{
resource.WithAttributes(
semconv.ServiceNameKey.String(apiserverService),
semconv.ServiceInstanceIDKey.String(c.APIServerID),
),
}
tp := traces.NewProvider(context.Background(), sampler, resourceOpts, opts...)
c.TracerProvider = &tp
return nil
}
// Validate verifies flags passed to TracingOptions.
func (o *TracingOptions) Validate() (errs []error) {
if o == nil || o.ConfigFile == "" {
return
}
if exists, err := path.Exists(path.CheckFollowSymlink, o.ConfigFile); !exists {
errs = append(errs, fmt.Errorf("tracing-config-file %s does not exist", o.ConfigFile))
} else if err != nil {
errs = append(errs, fmt.Errorf("error checking if tracing-config-file %s exists: %v", o.ConfigFile, err))
}
return
}

View File

@ -0,0 +1,58 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package options
import "testing"
func TestValidateTracingOptions(t *testing.T) {
testcases := []struct {
name string
expectError bool
contents *TracingOptions
}{
{
name: "nil-valid",
expectError: false,
},
{
name: "empty-valid",
expectError: false,
contents: &TracingOptions{},
},
{
name: "path-valid",
expectError: false,
contents: &TracingOptions{ConfigFile: "/"},
},
{
name: "path-invalid",
expectError: true,
contents: &TracingOptions{ConfigFile: "/path/doesnt/exist"},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
errs := tc.contents.Validate()
if tc.expectError == false && len(errs) != 0 {
t.Errorf("Calling Validate expected no error, got %v", errs)
} else if tc.expectError == true && len(errs) == 0 {
t.Errorf("Calling Validate expected error, got no error")
}
})
}
}

115
pkg/tracing/config.go Normal file
View File

@ -0,0 +1,115 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"fmt"
"io/ioutil"
"net/url"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
"k8s.io/apimachinery/pkg/util/validation/field"
"k8s.io/apiserver/pkg/apis/apiserver"
"k8s.io/apiserver/pkg/apis/apiserver/install"
)
const (
maxSamplingRatePerMillion = 1000000
)
var (
cfgScheme = runtime.NewScheme()
codecs = serializer.NewCodecFactory(cfgScheme)
)
func init() {
install.Install(cfgScheme)
}
// ReadTracingConfiguration reads the tracing configuration from a file
func ReadTracingConfiguration(configFilePath string) (*apiserver.TracingConfiguration, error) {
if configFilePath == "" {
return nil, fmt.Errorf("tracing config file was empty")
}
data, err := ioutil.ReadFile(configFilePath)
if err != nil {
return nil, fmt.Errorf("unable to read tracing configuration from %q: %v", configFilePath, err)
}
internalConfig := &apiserver.TracingConfiguration{}
// this handles json/yaml/whatever, and decodes all registered version to the internal version
if err := runtime.DecodeInto(codecs.UniversalDecoder(), data, internalConfig); err != nil {
return nil, fmt.Errorf("unable to decode tracing configuration data: %v", err)
}
return internalConfig, nil
}
// ValidateTracingConfiguration validates the tracing configuration
func ValidateTracingConfiguration(config *apiserver.TracingConfiguration) field.ErrorList {
allErrs := field.ErrorList{}
if config == nil {
// Tracing is disabled
return allErrs
}
if config.SamplingRatePerMillion != nil {
allErrs = append(allErrs, validateSamplingRate(*config.SamplingRatePerMillion, field.NewPath("samplingRatePerMillion"))...)
}
if config.Endpoint != nil {
allErrs = append(allErrs, validateEndpoint(*config.Endpoint, field.NewPath("endpoint"))...)
}
return allErrs
}
func validateSamplingRate(rate int32, fldPath *field.Path) field.ErrorList {
errs := field.ErrorList{}
if rate < 0 {
errs = append(errs, field.Invalid(
fldPath, rate,
"sampling rate must be positive",
))
}
if rate > maxSamplingRatePerMillion {
errs = append(errs, field.Invalid(
fldPath, rate,
"sampling rate per million must be less than or equal to one million",
))
}
return errs
}
func validateEndpoint(endpoint string, fldPath *field.Path) field.ErrorList {
errs := field.ErrorList{}
if url, err := url.Parse(endpoint); err != nil {
errs = append(errs, field.Invalid(
fldPath, endpoint,
err.Error(),
))
// If the Host is empty, it indicates no scheme was specified, which is valid.
} else if url.Host != "" {
switch url.Scheme {
case "dns":
case "unix":
case "unix-abstract":
default:
errs = append(errs, field.Invalid(
fldPath, endpoint,
fmt.Sprintf("unsupported scheme: %v. Options are none, dns, unix, or unix-abstract. See https://github.com/grpc/grpc/blob/master/doc/naming.md", url.Scheme),
))
}
}
return errs
}

252
pkg/tracing/config_test.go Normal file
View File

@ -0,0 +1,252 @@
/*
Copyright 2021 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tracing
import (
"fmt"
"io/ioutil"
"os"
"reflect"
"strings"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apiserver/pkg/apis/apiserver"
)
var (
localhost = "localhost:4317"
samplingRate = int32(12345)
)
func strptr(s string) *string {
return &s
}
func TestReadTracingConfiguration(t *testing.T) {
testcases := []struct {
name string
contents string
createFile bool
expectedResult *apiserver.TracingConfiguration
expectedError *string
}{
{
name: "empty",
createFile: true,
contents: ``,
expectedResult: &apiserver.TracingConfiguration{},
expectedError: nil,
},
{
name: "absent",
createFile: false,
contents: ``,
expectedResult: nil,
expectedError: strptr("unable to read tracing configuration from \"test-tracing-config-absent\": open test-tracing-config-absent: no such file or directory"),
},
{
name: "v1alpha1",
createFile: true,
contents: `
apiVersion: apiserver.config.k8s.io/v1alpha1
kind: TracingConfiguration
endpoint: localhost:4317
samplingRatePerMillion: 12345
`,
expectedResult: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &localhost,
SamplingRatePerMillion: &samplingRate,
},
expectedError: nil,
},
{
name: "wrong_type",
createFile: true,
contents: `
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: agent
spec:
selector:
matchLabels:
k8s-app: agent
template:
metadata:
labels:
k8s-app: agent
spec:
containers:
- image: k8s.gcr.io/busybox
name: agent
`,
expectedResult: nil,
expectedError: strptr("unable to decode tracing configuration data: no kind \"DaemonSet\" is registered for version \"apps/v1\" in scheme"),
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
proxyConfig := fmt.Sprintf("test-tracing-config-%s", tc.name)
if tc.createFile {
f, err := ioutil.TempFile("", proxyConfig)
if err != nil {
t.Fatal(err)
}
defer os.Remove(f.Name())
if err := ioutil.WriteFile(f.Name(), []byte(tc.contents), os.FileMode(0755)); err != nil {
t.Fatal(err)
}
proxyConfig = f.Name()
}
config, err := ReadTracingConfiguration(proxyConfig)
if err == nil && tc.expectedError != nil {
t.Errorf("calling ReadTracingConfiguration expected error: %s, did not get it", *tc.expectedError)
}
if err != nil && tc.expectedError == nil {
t.Errorf("unexpected error calling ReadTracingConfiguration got: %#v", err)
}
if err != nil && tc.expectedError != nil && !strings.HasPrefix(err.Error(), *tc.expectedError) {
t.Errorf("calling ReadTracingConfiguration expected error: %s, got %#v", *tc.expectedError, err)
}
if !reflect.DeepEqual(config, tc.expectedResult) {
t.Errorf("problem with configuration returned from ReadTracingConfiguration expected: %#v, got: %#v", tc.expectedResult, config)
}
})
}
}
func TestValidateTracingConfiguration(t *testing.T) {
samplingRate := int32(12378)
negativeRate := int32(-1)
tooHighRate := int32(1000001)
validEndpoint := "localhost:4317"
dnsEndpoint := "dns://google.com:4317"
unixEndpoint := "unix://path/to/socket"
invalidURL := "dn%2s://localhost:4317"
httpEndpoint := "http://localhost:4317"
testcases := []struct {
name string
expectError bool
contents *apiserver.TracingConfiguration
}{
{
name: "sampling-rate-valid",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
SamplingRatePerMillion: &samplingRate,
},
},
{
name: "sampling-rate-negative",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
SamplingRatePerMillion: &negativeRate,
},
},
{
name: "sampling-rate-negative",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
SamplingRatePerMillion: &tooHighRate,
},
},
{
name: "default Endpoint",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &validEndpoint,
},
},
{
name: "dns Endpoint",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &dnsEndpoint,
},
},
{
name: "unix Endpoint",
expectError: false,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &unixEndpoint,
},
},
{
name: "invalid Endpoint",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &httpEndpoint,
},
},
{
name: "invalid url",
expectError: true,
contents: &apiserver.TracingConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "",
APIVersion: "",
},
Endpoint: &invalidURL,
},
},
}
for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
errs := ValidateTracingConfiguration(tc.contents)
if tc.expectError == false && len(errs) != 0 {
t.Errorf("Calling ValidateTracingConfiguration expected no error, got %v", errs)
} else if tc.expectError == true && len(errs) == 0 {
t.Errorf("Calling ValidateTracingConfiguration expected error, got no error")
}
})
}
}