diff --git a/Makefile b/Makefile index 0ab31c9e03..2ec45a0dad 100644 --- a/Makefile +++ b/Makefile @@ -169,6 +169,8 @@ verify-codegen: .PHONY: protobuf protobuf: + protoc --go_out=. --go_opt=paths=source_relative pkg/otel/otlptracefile/pb/file.proto + go run golang.org/x/tools/cmd/goimports@latest -w pkg/otel/otlptracefile/pb/file.pb.go cd ${GOPATH_1ST}/src; protoc --gogo_out=. k8s.io/kops/protokube/pkg/gossip/mesh/mesh.proto .PHONY: hooks diff --git a/cmd/kops/create_cluster.go b/cmd/kops/create_cluster.go index b3475af67d..e1b427928a 100644 --- a/cmd/kops/create_cluster.go +++ b/cmd/kops/create_cluster.go @@ -474,6 +474,9 @@ func NewCmdCreateCluster(f *util.Factory, out io.Writer) *cobra.Command { } func RunCreateCluster(ctx context.Context, f *util.Factory, out io.Writer, c *CreateClusterOptions) error { + ctx, span := tracer.Start(ctx, "RunCreateCluster") + defer span.End() + isDryrun := false // direct requires --yes (others do not, because they don't make changes) targetName := c.Target diff --git a/cmd/kops/doc.go b/cmd/kops/doc.go new file mode 100644 index 0000000000..82b0f25b4b --- /dev/null +++ b/cmd/kops/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("k8s.io/kops/cmd/kops") diff --git a/cmd/kops/main.go b/cmd/kops/main.go index 06fce3cd41..4d4f383571 100644 --- a/cmd/kops/main.go +++ b/cmd/kops/main.go @@ -16,9 +16,45 @@ limitations under the License. package main // import "k8s.io/kops/cmd/kops" -import "context" +import ( + "context" + "fmt" + "os" + + "k8s.io/kops" +) func main() { ctx := context.Background() - Execute(ctx) + if err := run(ctx); err != nil { + os.Exit(1) + } +} + +func run(ctx context.Context) error { + // Set up OpenTelemetry. + serviceName := "kops" + serviceVersion := kops.Version + if kops.GitVersion != "" { + serviceVersion += ".git-" + kops.GitVersion + } + + otelShutdown, err := setupOTelSDK(ctx, serviceName, serviceVersion) + if err != nil { + fmt.Fprintf(os.Stderr, "Error: %v\n", err) + return err + } + // Handle shutdown properly so nothing leaks. + defer func() { + // We use a background context because the main context has probably been shut down. + if err := otelShutdown(context.Background()); err != nil { + fmt.Fprintf(os.Stderr, "error shutting down otel: %v\n", err) + } + }() + + if err := Execute(ctx); err != nil { + return err + } + + return nil } diff --git a/cmd/kops/otel.go b/cmd/kops/otel.go new file mode 100644 index 0000000000..138b3f1f90 --- /dev/null +++ b/cmd/kops/otel.go @@ -0,0 +1,111 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "context" + "errors" + "net/http" + "os" + "time" + + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" + + "k8s.io/kops/pkg/otel/otlptracefile" +) + +// setupOTelSDK bootstraps the OpenTelemetry pipeline. +// If it does not return an error, make sure to call shutdown for proper cleanup. +func setupOTelSDK(ctx context.Context, serviceName, serviceVersion string) (shutdown func(context.Context) error, err error) { + var shutdownFuncs []func(context.Context) error + + // shutdown calls cleanup functions registered via shutdownFuncs. + // The errors from the calls are joined. + // Each registered cleanup will be invoked once. + shutdown = func(ctx context.Context) error { + var err error + for _, fn := range shutdownFuncs { + err = errors.Join(err, fn(ctx)) + } + shutdownFuncs = nil + return err + } + + // handleErr calls shutdown for cleanup and makes sure that all errors are returned. + handleErr := func(inErr error) { + err = errors.Join(inErr, shutdown(ctx)) + } + + // Setup resource. + res, err := newResource(serviceName, serviceVersion) + if err != nil { + handleErr(err) + return + } + + // Setup trace provider. + tracerProvider, err := newTraceProvider(ctx, res) + if err != nil { + handleErr(err) + return + } + if tracerProvider != nil { + shutdownFuncs = append(shutdownFuncs, tracerProvider.Shutdown) + otel.SetTracerProvider(tracerProvider) + + http.DefaultClient = &http.Client{ + Transport: otelhttp.NewTransport(http.DefaultTransport), + } + } + + return +} + +func newResource(serviceName, serviceVersion string) (*resource.Resource, error) { + return resource.Merge(resource.Default(), + resource.NewWithAttributes(semconv.SchemaURL, + semconv.ServiceName(serviceName), + semconv.ServiceVersion(serviceVersion), + )) +} + +func newTraceProvider(ctx context.Context, res *resource.Resource) (*trace.TracerProvider, error) { + s := os.Getenv("OTEL_EXPORTER_OTLP_TRACES_FILE") + if s == "" { + s = os.Getenv("OTEL_EXPORTER_OTLP_FILE") + } + if s == "" { + return nil, nil + } + + traceExporter, err := otlptracefile.New(ctx, otlptracefile.WithPath(s)) + if err != nil { + return nil, err + } + + traceProvider := trace.NewTracerProvider( + trace.WithBatcher(traceExporter, + // Default is 5s. Set to 1s for demonstrative purposes. + trace.WithBatchTimeout(time.Second)), + trace.WithResource(res), + ) + return traceProvider, nil +} diff --git a/cmd/kops/root.go b/cmd/kops/root.go index aa2f797991..59ba034849 100644 --- a/cmd/kops/root.go +++ b/cmd/kops/root.go @@ -28,6 +28,8 @@ import ( "github.com/spf13/cobra" "github.com/spf13/viper" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "k8s.io/apimachinery/pkg/util/validation/field" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" @@ -89,12 +91,13 @@ var rootCommand = RootCmd{ }, } -func Execute(ctx context.Context) { +func Execute(ctx context.Context) error { + ctx, span := tracer.Start(ctx, "kops", trace.WithAttributes(attribute.StringSlice("args", os.Args))) + defer span.End() + goflag.Set("logtostderr", "true") goflag.CommandLine.Parse([]string{}) - if err := rootCommand.cobraCommand.ExecuteContext(ctx); err != nil { - os.Exit(1) - } + return rootCommand.cobraCommand.ExecuteContext(ctx) } func init() { diff --git a/go.mod b/go.mod index bc18d93403..2d23f22f34 100644 --- a/go.mod +++ b/go.mod @@ -41,6 +41,12 @@ require ( github.com/spotinst/spotinst-sdk-go v1.145.0 github.com/stretchr/testify v1.8.4 github.com/weaveworks/mesh v0.0.0-20191105120815-58dbcc3e8e63 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 + go.opentelemetry.io/otel v1.19.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 + go.opentelemetry.io/otel/sdk v1.19.0 + go.opentelemetry.io/otel/trace v1.19.0 + go.opentelemetry.io/proto/otlp v1.0.0 go.uber.org/multierr v1.11.0 golang.org/x/crypto v0.15.0 golang.org/x/exp v0.0.0-20231110203233-9a3e6036ecaa @@ -115,6 +121,7 @@ require ( github.com/evanphx/json-patch/v5 v5.6.0 // indirect github.com/evertras/bubble-table v0.14.4 // indirect github.com/exponent-io/jsonpath v0.0.0-20151013193312-d6023ce2651d // indirect + github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.6.0 // indirect github.com/go-errors/errors v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect @@ -137,6 +144,7 @@ require ( github.com/googleapis/gax-go/v2 v2.12.0 // indirect github.com/gorilla/mux v1.8.0 // indirect github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect github.com/hashicorp/go-immutable-radix v1.3.1 // indirect @@ -209,9 +217,7 @@ require ( github.com/vbatts/tar-split v0.11.3 // indirect github.com/xlab/treeprint v1.2.0 // indirect go.opencensus.io v0.24.0 // indirect - go.opentelemetry.io/otel v1.19.0 // indirect go.opentelemetry.io/otel/metric v1.19.0 // indirect - go.opentelemetry.io/otel/trace v1.19.0 // indirect go.starlark.net v0.0.0-20230525235612-a134d8f9ddca // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/term v0.14.0 // indirect @@ -220,6 +226,7 @@ require ( golang.org/x/tools v0.15.0 // indirect gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect google.golang.org/appengine v1.6.7 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20231016165738-49dd2c1f3d0b // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20231030173426-d783a09b4405 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/go.sum b/go.sum index 15cace4db8..02ade1602d 100644 --- a/go.sum +++ b/go.sum @@ -286,6 +286,8 @@ github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzw github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/glog v1.1.2 h1:DVjP2PbBOzHyzA+dn3WhHIq4NdVu3Q+pvivFICf/7fo= +github.com/golang/glog v1.1.2/go.mod h1:zR+okUeTbrL6EL3xHUDxZuEtGv04p5shwip1+mL/rLQ= github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= @@ -417,6 +419,8 @@ github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/ad github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79 h1:+ngKgrYPPJrOjhax5N+uePQ0Fh1Z7PheYoUI/0nzkPA= github.com/gregjones/httpcache v0.0.0-20190611155906-901d90724c79/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -731,13 +735,21 @@ go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E= go.opencensus.io v0.24.0 h1:y73uSU6J157QMP2kn2r30vwW1A2W2WFwSCGnAVxeaD0= go.opencensus.io v0.24.0/go.mod h1:vNK8G9p7aAivkbmorf4v+7Hgx+Zs0yY+0fOtgBfjQKo= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0 h1:x8Z78aZx8cOF0+Kkazoc7lwUNMGy0LrzEMxTm4BbTxg= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.45.0/go.mod h1:62CPTSry9QZtOaSsE3tOzhx6LzDhHnXJ6xHeMNNiM6Q= go.opentelemetry.io/otel v1.19.0 h1:MuS/TNf4/j4IXsZuJegVzI1cwut7Qc00344rgH7p8bs= go.opentelemetry.io/otel v1.19.0/go.mod h1:i0QyjOq3UPoTzff0PJB2N66fb4S0+rSbSB15/oyH9fY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0 h1:Mne5On7VWdx7omSrSSZvM4Kw7cS7NQkOOmLcgscI51U= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.19.0/go.mod h1:IPtUMKL4O3tH5y+iXVyAXqpAwMuzC1IrxVS81rummfE= go.opentelemetry.io/otel/metric v1.19.0 h1:aTzpGtV0ar9wlV4Sna9sdJyII5jTVJEvKETPiOKwvpE= go.opentelemetry.io/otel/metric v1.19.0/go.mod h1:L5rUsV9kM1IxCj1MmSdS+JQAcVm319EUrDVLrt7jqt8= +go.opentelemetry.io/otel/sdk v1.19.0 h1:6USY6zH+L8uMH8L3t1enZPR3WFEmSTADlqldyHtJi3o= +go.opentelemetry.io/otel/sdk v1.19.0/go.mod h1:NedEbbS4w3C6zElbLdPJKOpJQOrGUJ+GfzpjUvI0v1A= go.opentelemetry.io/otel/trace v1.19.0 h1:DFVQmlVbfVeOuBRrwdtaehRrWiL1JoVs9CPIQ1Dzxpg= go.opentelemetry.io/otel/trace v1.19.0/go.mod h1:mfaSyvGyEJEI0nyV2I4qhNQnbBOUUmYZpYojqMnX2vo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= +go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.starlark.net v0.0.0-20230525235612-a134d8f9ddca h1:VdD38733bfYv5tUZwEIskMM93VanwNIi5bIKnDrJdEY= go.starlark.net v0.0.0-20230525235612-a134d8f9ddca/go.mod h1:jxU+3+j+71eXOW14274+SmmuW82qJzl6iZSeqEtTGds= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= diff --git a/pkg/client/simple/vfsclientset/clientset.go b/pkg/client/simple/vfsclientset/clientset.go index e3c93647be..a6fbc9e1b4 100644 --- a/pkg/client/simple/vfsclientset/clientset.go +++ b/pkg/client/simple/vfsclientset/clientset.go @@ -50,7 +50,10 @@ func (c *VFSClientset) clusters() *ClusterVFS { // GetCluster implements the GetCluster method of simple.Clientset for a VFS-backed state store func (c *VFSClientset) GetCluster(ctx context.Context, name string) (*kops.Cluster, error) { - return c.clusters().Get(name, metav1.GetOptions{}) + ctx, span := tracer.Start(ctx, "VFSClientset::GetCluster") + defer span.End() + + return c.clusters().Get(ctx, name, metav1.GetOptions{}) } // UpdateCluster implements the UpdateCluster method of simple.Clientset for a VFS-backed state store @@ -65,7 +68,10 @@ func (c *VFSClientset) CreateCluster(ctx context.Context, cluster *kops.Cluster) // ListClusters implements the ListClusters method of simple.Clientset for a VFS-backed state store func (c *VFSClientset) ListClusters(ctx context.Context, options metav1.ListOptions) (*kops.ClusterList, error) { - return c.clusters().List(options) + ctx, span := tracer.Start(ctx, "VFSClientset::ListClusters") + defer span.End() + + return c.clusters().List(ctx, options) } // ConfigBaseFor implements the ConfigBaseFor method of simple.Clientset for a VFS-backed state store diff --git a/pkg/client/simple/vfsclientset/cluster.go b/pkg/client/simple/vfsclientset/cluster.go index af73ba1477..aa34682dab 100644 --- a/pkg/client/simple/vfsclientset/cluster.go +++ b/pkg/client/simple/vfsclientset/cluster.go @@ -47,9 +47,7 @@ func newClusterVFS(vfsContext *vfs.VFSContext, basePath vfs.Path) *ClusterVFS { return c } -func (c *ClusterVFS) Get(name string, options metav1.GetOptions) (*api.Cluster, error) { - ctx := context.TODO() - +func (c *ClusterVFS) Get(ctx context.Context, name string, options metav1.GetOptions) (*api.Cluster, error) { if options.ResourceVersion != "" { return nil, fmt.Errorf("ResourceVersion not supported in ClusterVFS::Get") } @@ -72,9 +70,7 @@ func (c *ClusterVFS) configBase(clusterName string) (vfs.Path, error) { return configPath, nil } -func (c *ClusterVFS) List(options metav1.ListOptions) (*api.ClusterList, error) { - ctx := context.TODO() - +func (c *ClusterVFS) List(ctx context.Context, options metav1.ListOptions) (*api.ClusterList, error) { names, err := c.listNames(ctx) if err != nil { return nil, err @@ -134,7 +130,7 @@ func (r *ClusterVFS) Update(c *api.Cluster, status *api.ClusterStatus) (*api.Clu return nil, field.Required(field.NewPath("objectMeta", "name"), "clusterName is required") } - old, err := r.Get(clusterName, metav1.GetOptions{}) + old, err := r.Get(ctx, clusterName, metav1.GetOptions{}) if err != nil { return nil, err } diff --git a/pkg/client/simple/vfsclientset/doc.go b/pkg/client/simple/vfsclientset/doc.go new file mode 100644 index 0000000000..f985eee9f0 --- /dev/null +++ b/pkg/client/simple/vfsclientset/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vfsclientset + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("k8s.io/kops/pkg/client/simple/vfsclientset") diff --git a/pkg/otel/otlptracefile/client.go b/pkg/otel/otlptracefile/client.go new file mode 100644 index 0000000000..89ad3921cd --- /dev/null +++ b/pkg/otel/otlptracefile/client.go @@ -0,0 +1,110 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otlptracefile + +import ( + "context" + "errors" + "fmt" + "sync" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + tracepb "go.opentelemetry.io/proto/otlp/trace/v1" +) + +type client struct { + cfg Config + + writerMutex sync.RWMutex + writer *writer +} + +var _ otlptrace.Client = (*client)(nil) + +// newClient constructs a client. +func newClient(opts ...Option) *client { + var cfg Config + for _, option := range opts { + option(&cfg) + } + + c := &client{ + cfg: cfg, + } + + return c +} + +// Start implements otlptrace.Client. +func (c *client) Start(ctx context.Context) error { + c.writerMutex.Lock() + defer c.writerMutex.Unlock() + + if c.writer != nil { + return fmt.Errorf("already started") + } + + w, err := newWriter(c.cfg) + if err != nil { + return err + } + c.writer = w + + return nil +} + +// Stop implements otlptrace.Client. +func (c *client) Stop(ctx context.Context) error { + c.writerMutex.Lock() + defer c.writerMutex.Unlock() + + if c.writer != nil { + err := c.writer.Close() + if err != nil { + return err + } + c.writer = nil + } + + return nil +} + +var errShutdown = errors.New("the client is shutdown") + +// UploadTraces implements otlptrace.Client. +func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error { + c.writerMutex.RLock() + defer c.writerMutex.RUnlock() + + if c.writer == nil { + return errShutdown + } + + return c.writer.writeTraces(ctx, &coltracepb.ExportTraceServiceRequest{ + ResourceSpans: protoSpans, + }) +} + +// MarshalLog is the marshaling function used by the logging system to represent this Client. +func (c *client) MarshalLog() interface{} { + return struct { + Type string + }{ + Type: "otlptracefile", + } +} diff --git a/pkg/otel/otlptracefile/exporter.go b/pkg/otel/otlptracefile/exporter.go new file mode 100644 index 0000000000..5cc38e4587 --- /dev/null +++ b/pkg/otel/otlptracefile/exporter.go @@ -0,0 +1,33 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otlptracefile + +import ( + "context" + + "go.opentelemetry.io/otel/exporters/otlp/otlptrace" +) + +// New constructs a new Exporter and starts it. +func New(ctx context.Context, opts ...Option) (*otlptrace.Exporter, error) { + return otlptrace.New(ctx, newClient(opts...)) +} + +// NewUnstarted constructs a new Exporter and does not start it. +func NewUnstarted(opts ...Option) *otlptrace.Exporter { + return otlptrace.NewUnstarted(newClient(opts...)) +} diff --git a/pkg/otel/otlptracefile/options.go b/pkg/otel/otlptracefile/options.go new file mode 100644 index 0000000000..23c43e2819 --- /dev/null +++ b/pkg/otel/otlptracefile/options.go @@ -0,0 +1,30 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otlptracefile + +// Option applies an option to the Config. +type Option func(cfg *Config) + +type Config struct { + path string +} + +func WithPath(path string) Option { + return func(cfg *Config) { + cfg.path = path + } +} diff --git a/pkg/otel/otlptracefile/pb/file.pb.go b/pkg/otel/otlptracefile/pb/file.pb.go new file mode 100644 index 0000000000..80d651e676 --- /dev/null +++ b/pkg/otel/otlptracefile/pb/file.pb.go @@ -0,0 +1,232 @@ +/* +Copyright The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// To regenerate the go code, run make protobuf + +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.0 +// protoc v3.21.12 +// source: pkg/otel/otlptracefile/pb/file.proto + +package pb + +import ( + reflect "reflect" + sync "sync" + + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type WellKnownTypeCode int32 + +const ( + WellKnownTypeCode_WellKnownTypeCode_Unknown WellKnownTypeCode = 0 + WellKnownTypeCode_WellKnownTypeCode_ObjectType WellKnownTypeCode = 1 +) + +// Enum value maps for WellKnownTypeCode. +var ( + WellKnownTypeCode_name = map[int32]string{ + 0: "WellKnownTypeCode_Unknown", + 1: "WellKnownTypeCode_ObjectType", + } + WellKnownTypeCode_value = map[string]int32{ + "WellKnownTypeCode_Unknown": 0, + "WellKnownTypeCode_ObjectType": 1, + } +) + +func (x WellKnownTypeCode) Enum() *WellKnownTypeCode { + p := new(WellKnownTypeCode) + *p = x + return p +} + +func (x WellKnownTypeCode) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (WellKnownTypeCode) Descriptor() protoreflect.EnumDescriptor { + return file_pkg_otel_otlptracefile_pb_file_proto_enumTypes[0].Descriptor() +} + +func (WellKnownTypeCode) Type() protoreflect.EnumType { + return &file_pkg_otel_otlptracefile_pb_file_proto_enumTypes[0] +} + +func (x WellKnownTypeCode) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use WellKnownTypeCode.Descriptor instead. +func (WellKnownTypeCode) EnumDescriptor() ([]byte, []int) { + return file_pkg_otel_otlptracefile_pb_file_proto_rawDescGZIP(), []int{0} +} + +// ObjectType is used to identify the type of objects in the file. +// Each object has a header, and there is a type_code value in the header. +// Before an object is written, the type information must have been recorded with an ObjectType record. +type ObjectType struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + TypeCode uint32 `protobuf:"varint,1,opt,name=type_code,json=typeCode,proto3" json:"type_code,omitempty"` + TypeName string `protobuf:"bytes,2,opt,name=type_name,json=typeName,proto3" json:"type_name,omitempty"` +} + +func (x *ObjectType) Reset() { + *x = ObjectType{} + if protoimpl.UnsafeEnabled { + mi := &file_pkg_otel_otlptracefile_pb_file_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ObjectType) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ObjectType) ProtoMessage() {} + +func (x *ObjectType) ProtoReflect() protoreflect.Message { + mi := &file_pkg_otel_otlptracefile_pb_file_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ObjectType.ProtoReflect.Descriptor instead. +func (*ObjectType) Descriptor() ([]byte, []int) { + return file_pkg_otel_otlptracefile_pb_file_proto_rawDescGZIP(), []int{0} +} + +func (x *ObjectType) GetTypeCode() uint32 { + if x != nil { + return x.TypeCode + } + return 0 +} + +func (x *ObjectType) GetTypeName() string { + if x != nil { + return x.TypeName + } + return "" +} + +var File_pkg_otel_otlptracefile_pb_file_proto protoreflect.FileDescriptor + +var file_pkg_otel_otlptracefile_pb_file_proto_rawDesc = []byte{ + 0x0a, 0x24, 0x70, 0x6b, 0x67, 0x2f, 0x6f, 0x74, 0x65, 0x6c, 0x2f, 0x6f, 0x74, 0x6c, 0x70, 0x74, + 0x72, 0x61, 0x63, 0x65, 0x66, 0x69, 0x6c, 0x65, 0x2f, 0x70, 0x62, 0x2f, 0x66, 0x69, 0x6c, 0x65, + 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0d, 0x6f, 0x74, 0x6c, 0x70, 0x74, 0x72, 0x61, 0x63, + 0x65, 0x66, 0x69, 0x6c, 0x65, 0x22, 0x46, 0x0a, 0x0a, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x54, + 0x79, 0x70, 0x65, 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x79, 0x70, 0x65, 0x5f, 0x63, 0x6f, 0x64, 0x65, + 0x18, 0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x08, 0x74, 0x79, 0x70, 0x65, 0x43, 0x6f, 0x64, 0x65, + 0x12, 0x1b, 0x0a, 0x09, 0x74, 0x79, 0x70, 0x65, 0x5f, 0x6e, 0x61, 0x6d, 0x65, 0x18, 0x02, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x08, 0x74, 0x79, 0x70, 0x65, 0x4e, 0x61, 0x6d, 0x65, 0x2a, 0x54, 0x0a, + 0x11, 0x57, 0x65, 0x6c, 0x6c, 0x4b, 0x6e, 0x6f, 0x77, 0x6e, 0x54, 0x79, 0x70, 0x65, 0x43, 0x6f, + 0x64, 0x65, 0x12, 0x1d, 0x0a, 0x19, 0x57, 0x65, 0x6c, 0x6c, 0x4b, 0x6e, 0x6f, 0x77, 0x6e, 0x54, + 0x79, 0x70, 0x65, 0x43, 0x6f, 0x64, 0x65, 0x5f, 0x55, 0x6e, 0x6b, 0x6e, 0x6f, 0x77, 0x6e, 0x10, + 0x00, 0x12, 0x20, 0x0a, 0x1c, 0x57, 0x65, 0x6c, 0x6c, 0x4b, 0x6e, 0x6f, 0x77, 0x6e, 0x54, 0x79, + 0x70, 0x65, 0x43, 0x6f, 0x64, 0x65, 0x5f, 0x4f, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x54, 0x79, 0x70, + 0x65, 0x10, 0x01, 0x42, 0x27, 0x5a, 0x25, 0x6b, 0x38, 0x73, 0x2e, 0x69, 0x6f, 0x2f, 0x6b, 0x6f, + 0x70, 0x73, 0x2f, 0x70, 0x6b, 0x67, 0x2f, 0x6f, 0x74, 0x65, 0x6c, 0x2f, 0x6f, 0x74, 0x6c, 0x70, + 0x74, 0x72, 0x61, 0x63, 0x65, 0x66, 0x69, 0x6c, 0x65, 0x2f, 0x70, 0x62, 0x62, 0x06, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_pkg_otel_otlptracefile_pb_file_proto_rawDescOnce sync.Once + file_pkg_otel_otlptracefile_pb_file_proto_rawDescData = file_pkg_otel_otlptracefile_pb_file_proto_rawDesc +) + +func file_pkg_otel_otlptracefile_pb_file_proto_rawDescGZIP() []byte { + file_pkg_otel_otlptracefile_pb_file_proto_rawDescOnce.Do(func() { + file_pkg_otel_otlptracefile_pb_file_proto_rawDescData = protoimpl.X.CompressGZIP(file_pkg_otel_otlptracefile_pb_file_proto_rawDescData) + }) + return file_pkg_otel_otlptracefile_pb_file_proto_rawDescData +} + +var file_pkg_otel_otlptracefile_pb_file_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_pkg_otel_otlptracefile_pb_file_proto_msgTypes = make([]protoimpl.MessageInfo, 1) +var file_pkg_otel_otlptracefile_pb_file_proto_goTypes = []interface{}{ + (WellKnownTypeCode)(0), // 0: otlptracefile.WellKnownTypeCode + (*ObjectType)(nil), // 1: otlptracefile.ObjectType +} +var file_pkg_otel_otlptracefile_pb_file_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_pkg_otel_otlptracefile_pb_file_proto_init() } +func file_pkg_otel_otlptracefile_pb_file_proto_init() { + if File_pkg_otel_otlptracefile_pb_file_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_pkg_otel_otlptracefile_pb_file_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ObjectType); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_pkg_otel_otlptracefile_pb_file_proto_rawDesc, + NumEnums: 1, + NumMessages: 1, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_pkg_otel_otlptracefile_pb_file_proto_goTypes, + DependencyIndexes: file_pkg_otel_otlptracefile_pb_file_proto_depIdxs, + EnumInfos: file_pkg_otel_otlptracefile_pb_file_proto_enumTypes, + MessageInfos: file_pkg_otel_otlptracefile_pb_file_proto_msgTypes, + }.Build() + File_pkg_otel_otlptracefile_pb_file_proto = out.File + file_pkg_otel_otlptracefile_pb_file_proto_rawDesc = nil + file_pkg_otel_otlptracefile_pb_file_proto_goTypes = nil + file_pkg_otel_otlptracefile_pb_file_proto_depIdxs = nil +} diff --git a/pkg/otel/otlptracefile/pb/file.proto b/pkg/otel/otlptracefile/pb/file.proto new file mode 100644 index 0000000000..4d4daed7cf --- /dev/null +++ b/pkg/otel/otlptracefile/pb/file.proto @@ -0,0 +1,19 @@ +// To regenerate the go code, run make protobuf +syntax = 'proto3'; + +package otlptracefile; + +option go_package = "k8s.io/kops/pkg/otel/otlptracefile/pb"; + +// ObjectType is used to identify the type of objects in the file. +// Each object has a header, and there is a type_code value in the header. +// Before an object is written, the type information must have been recorded with an ObjectType record. +message ObjectType { + uint32 type_code = 1; + string type_name = 2; +} + +enum WellKnownTypeCode { + WellKnownTypeCode_Unknown = 0; + WellKnownTypeCode_ObjectType = 1; +} \ No newline at end of file diff --git a/pkg/otel/otlptracefile/writer.go b/pkg/otel/otlptracefile/writer.go new file mode 100644 index 0000000000..5639f08d3f --- /dev/null +++ b/pkg/otel/otlptracefile/writer.go @@ -0,0 +1,163 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package otlptracefile + +import ( + "context" + "encoding/binary" + "fmt" + "hash/crc32" + "os" + "sync" + + coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1" + "google.golang.org/protobuf/proto" + "k8s.io/kops/pkg/otel/otlptracefile/pb" +) + +type writer struct { + fileMutex sync.Mutex + f *os.File + + typeCodesMutex sync.Mutex + nextTypeCode TypeCode + typeCodes map[string]TypeCode +} + +type TypeCode uint32 + +func newWriter(cfg Config) (*writer, error) { + f, err := os.OpenFile(cfg.path, os.O_RDWR|os.O_CREATE|os.O_TRUNC, 0600) + if err != nil { + return nil, fmt.Errorf("error opening %q: %w", cfg.path, err) + } + w := &writer{ + f: f, + } + w.nextTypeCode = 32 + w.typeCodes = make(map[string]TypeCode) + w.recordWellKnownType(pb.WellKnownTypeCode_WellKnownTypeCode_ObjectType, &pb.ObjectType{}) + + return w, nil +} + +// writeTraces is called by the otel libraries to write a set of trace records. +func (w *writer) writeTraces(ctx context.Context, req *coltracepb.ExportTraceServiceRequest) error { + return w.writeObject(ctx, req) +} + +// codeForType returns the integer code value for objects of obj. +// If this is the first time we've seen the type, this method will assign a code value, write it to the file and return it. +func (w *writer) codeForType(ctx context.Context, obj proto.Message) (TypeCode, error) { + typeName := string(obj.ProtoReflect().Descriptor().FullName()) + + w.typeCodesMutex.Lock() + defer w.typeCodesMutex.Unlock() + + typeCode, found := w.typeCodes[typeName] + if found { + return typeCode, nil + } + + typeCode = w.nextTypeCode + w.nextTypeCode++ + + record := &pb.ObjectType{ + TypeCode: uint32(typeCode), + TypeName: typeName, + } + if err := w.writeObjectWithTypeCode(ctx, TypeCode(pb.WellKnownTypeCode_WellKnownTypeCode_ObjectType), record); err != nil { + return 0, err + } + + w.typeCodes[typeName] = typeCode + return typeCode, nil +} + +// recordWellKnownType is used to insert a "system" type into the table of type codes. +// This is used for the types that are needed to e.g. record the type code table itself. +func (w *writer) recordWellKnownType(typeCode pb.WellKnownTypeCode, obj proto.Message) { + typeName := string(obj.ProtoReflect().Descriptor().FullName()) + + w.typeCodesMutex.Lock() + defer w.typeCodesMutex.Unlock() + + w.typeCodes[typeName] = TypeCode(typeCode) +} + +// writeObject appends an object to the file +func (w *writer) writeObject(ctx context.Context, obj proto.Message) error { + typeCode, err := w.codeForType(ctx, obj) + if err != nil { + return err + } + + return w.writeObjectWithTypeCode(ctx, typeCode, obj) +} + +// writeObjectWithTypeCode is the key function here. We encode and write the object. +// We include a header that identifies the object using the provided typeCode. +func (w *writer) writeObjectWithTypeCode(ctx context.Context, typeCode TypeCode, obj proto.Message) error { + buf, err := proto.Marshal(obj) + if err != nil { + return fmt.Errorf("converting to proto: %w", err) + } + + crc32q := crc32.MakeTable(crc32.Castagnoli) + checksum := crc32.Checksum(buf, crc32q) + + flags := uint32(0) + + w.fileMutex.Lock() + defer w.fileMutex.Unlock() + + if w.f == nil { + return fmt.Errorf("already closed") + } + + // write the object with a header. + header := make([]byte, 16) + binary.BigEndian.PutUint32(header[0:4], uint32(len(buf))) + binary.BigEndian.PutUint32(header[4:8], checksum) + binary.BigEndian.PutUint32(header[8:12], flags) + binary.BigEndian.PutUint32(header[12:16], uint32(typeCode)) + + if _, err := w.f.Write(header); err != nil { + return fmt.Errorf("writing header: %w", err) + } + if _, err := w.f.Write(buf); err != nil { + // TODO: Rotate file? + return fmt.Errorf("writing body: %w", err) + } + + return nil +} + +// Close closes the output file. +func (w *writer) Close() error { + w.fileMutex.Lock() + defer w.fileMutex.Unlock() + + if w.f != nil { + if err := w.f.Close(); err != nil { + return err + } + w.f = nil + } + + return nil +} diff --git a/upup/pkg/fi/context.go b/upup/pkg/fi/context.go index 61c5087c7c..4c50c4908d 100644 --- a/upup/pkg/fi/context.go +++ b/upup/pkg/fi/context.go @@ -118,7 +118,7 @@ func (c *Context[T]) RunTasks(options RunTasksOptions) error { context: c, options: options, } - return e.RunTasks(c.tasks) + return e.RunTasks(c.ctx, c.tasks) } // Render dispatches the creation of an object to the appropriate handler defined on the Task, diff --git a/upup/pkg/fi/doc.go b/upup/pkg/fi/doc.go new file mode 100644 index 0000000000..8145677dd4 --- /dev/null +++ b/upup/pkg/fi/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package fi + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("k8s.io/kops/upup/pkg/fi") diff --git a/upup/pkg/fi/executor.go b/upup/pkg/fi/executor.go index 42770c971a..32d62e474f 100644 --- a/upup/pkg/fi/executor.go +++ b/upup/pkg/fi/executor.go @@ -17,6 +17,7 @@ limitations under the License. package fi import ( + "context" "errors" "fmt" "strings" @@ -53,7 +54,7 @@ func (o *RunTasksOptions) InitDefaults() { // RunTasks executes all the tasks, considering their dependencies // It will perform some re-execution on error, retrying as long as progress is still being made -func (e *executor[T]) RunTasks(taskMap map[string]Task[T]) error { +func (e *executor[T]) RunTasks(ctx context.Context, taskMap map[string]Task[T]) error { dependencies := FindTaskDependencies(taskMap) for _, task := range taskMap { @@ -119,7 +120,7 @@ func (e *executor[T]) RunTasks(taskMap map[string]Task[T]) error { var tasks []*taskState[T] tasks = append(tasks, canRun...) - taskErrors := e.forkJoin(tasks) + taskErrors := e.forkJoin(ctx, tasks) var errs []error for i, err := range taskErrors { ts := tasks[i] @@ -189,7 +190,7 @@ func (e *executor[T]) RunTasks(taskMap map[string]Task[T]) error { return nil } -func (e *executor[T]) forkJoin(tasks []*taskState[T]) []error { +func (e *executor[T]) forkJoin(ctx context.Context, tasks []*taskState[T]) []error { if len(tasks) == 0 { return nil } @@ -203,6 +204,9 @@ func (e *executor[T]) forkJoin(tasks []*taskState[T]) []error { go func(ts *taskState[T], index int) { defer wg.Done() + _, span := tracer.Start(ctx, "task-"+ts.key) + defer span.End() + resultsMutex.Lock() results[index] = fmt.Errorf("function panic") resultsMutex.Unlock() diff --git a/util/pkg/vfs/doc.go b/util/pkg/vfs/doc.go new file mode 100644 index 0000000000..04e2f3c322 --- /dev/null +++ b/util/pkg/vfs/doc.go @@ -0,0 +1,21 @@ +/* +Copyright 2023 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package vfs + +import "go.opentelemetry.io/otel" + +var tracer = otel.Tracer("k8s.io/kops/util/pkg/vfs") diff --git a/util/pkg/vfs/s3context.go b/util/pkg/vfs/s3context.go index eb5f39a397..5c950bc425 100644 --- a/util/pkg/vfs/s3context.go +++ b/util/pkg/vfs/s3context.go @@ -74,12 +74,15 @@ func NewS3Context() *S3Context { } } -func (s *S3Context) getClient(region string) (*s3.S3, error) { +func (s *S3Context) getClient(ctx context.Context, region string) (*s3.S3, error) { s.mutex.Lock() defer s.mutex.Unlock() s3Client := s.clients[region] if s3Client == nil { + _, span := tracer.Start(ctx, "S3Context::getClient") + defer span.End() + var config *aws.Config var err error endpoint := os.Getenv("S3_ENDPOINT") @@ -102,6 +105,7 @@ func (s *S3Context) getClient(region string) (*s3.S3, error) { if err != nil { return nil, fmt.Errorf("error starting new AWS session: %v", err) } + s3Client = s3.New(sess, config) s.clients[region] = s3Client } @@ -139,6 +143,9 @@ func (s *S3Context) getDetailsForBucket(ctx context.Context, bucket string) (*S3 return bucketDetails, nil } + ctx, span := tracer.Start(ctx, "S3Path::getDetailsForBucket") + defer span.End() + bucketDetails = &S3BucketDetails{ context: s, region: "", @@ -158,9 +165,9 @@ func (s *S3Context) getDetailsForBucket(ctx context.Context, bucket string) (*S3 awsRegion := os.Getenv("AWS_REGION") if awsRegion == "" { - isEC2, err := isRunningOnEC2() + isEC2, err := isRunningOnEC2(ctx) if isEC2 || err != nil { - region, err := getRegionFromMetadata() + region, err := getRegionFromMetadata(ctx) if err != nil { klog.V(2).Infof("unable to get region from metadata:%v", err) } else { @@ -180,7 +187,7 @@ func (s *S3Context) getDetailsForBucket(ctx context.Context, bucket string) (*S3 } var response *s3.GetBucketLocationOutput - s3Client, err := s.getClient(awsRegion) + s3Client, err := s.getClient(ctx, awsRegion) if err != nil { return bucketDetails, fmt.Errorf("error connecting to S3: %s", err) } @@ -225,12 +232,15 @@ func (b *S3BucketDetails) hasServerSideEncryptionByDefault(ctx context.Context) return *b.applyServerSideEncryptionByDefault } + ctx, span := tracer.Start(ctx, "S3BucketDetails::hasServerSideEncryptionByDefault") + defer span.End() + applyServerSideEncryptionByDefault := false // We only make one attempt to find the SSE policy (even if there's an error) b.applyServerSideEncryptionByDefault = &applyServerSideEncryptionByDefault - client, err := b.context.getClient(b.region) + client, err := b.context.getClient(ctx, b.region) if err != nil { klog.Warningf("Unable to read bucket encryption policy for %q in region %q: will encrypt using AES256", b.name, b.region) return false @@ -279,6 +289,9 @@ out the first result. See also: https://docs.aws.amazon.com/goto/WebAPI/s3-2006-03-01/GetBucketLocationRequest */ func bruteforceBucketLocation(ctx context.Context, region *string, request *s3.GetBucketLocationInput) (*s3.GetBucketLocationOutput, error) { + ctx, span := tracer.Start(ctx, "bruteforceBucketLocation") + defer span.End() + config := &aws.Config{Region: region} config = config.WithCredentialsChainVerboseErrors(true) @@ -287,12 +300,12 @@ func bruteforceBucketLocation(ctx context.Context, region *string, request *s3.G SharedConfigState: session.SharedConfigEnable, }) if err != nil { - return nil, fmt.Errorf("error creating aws session: %v", err) + return nil, fmt.Errorf("creating aws session: %w", err) } regions, err := ec2.New(session).DescribeRegions(nil) if err != nil { - return nil, fmt.Errorf("Unable to list AWS regions: %v", err) + return nil, fmt.Errorf("listing AWS regions: %w", err) } klog.V(2).Infof("Querying S3 for bucket location for %s", *request.Bucket) @@ -321,13 +334,13 @@ func bruteforceBucketLocation(ctx context.Context, region *string, request *s3.G // isRunningOnEC2 determines if we could be running on EC2. // It is used to avoid a call to the metadata service to get the current region, // because that call is slow if not running on EC2 -func isRunningOnEC2() (bool, error) { +func isRunningOnEC2(ctx context.Context) (bool, error) { if runtime.GOOS == "linux" { // Approach based on https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/identify_ec2_instances.html productUUID, err := os.ReadFile("/sys/devices/virtual/dmi/id/product_uuid") if err != nil { klog.V(2).Infof("unable to read /sys/devices/virtual/dmi/id/product_uuid, assuming not running on EC2: %v", err) - return false, err + return false, nil } s := strings.ToLower(strings.TrimSpace(string(productUUID))) @@ -343,7 +356,10 @@ func isRunningOnEC2() (bool, error) { } // getRegionFromMetadata queries the metadata service for the current region, if running in EC2 -func getRegionFromMetadata() (string, error) { +func getRegionFromMetadata(ctx context.Context) (string, error) { + ctx, span := tracer.Start(ctx, "getRegionFromMetadata") + defer span.End() + // Use an even shorter timeout, to minimize impact when not running on EC2 // Note that we still retry a few times, this works out a little under a 1s delay shortTimeout := &aws.Config{ @@ -354,13 +370,13 @@ func getRegionFromMetadata() (string, error) { metadataSession, err := session.NewSession(shortTimeout) if err != nil { - return "", fmt.Errorf("unable to build session: %v", err) + return "", fmt.Errorf("building AWS metadata session: %w", err) } metadata := ec2metadata.New(metadataSession) - metadataRegion, err := metadata.Region() + metadataRegion, err := metadata.RegionWithContext(ctx) if err != nil { - return "", fmt.Errorf("unable to get region from metadata: %v", err) + return "", fmt.Errorf("getting AWS region from metadata: %w", err) } return metadataRegion, nil diff --git a/util/pkg/vfs/s3fs.go b/util/pkg/vfs/s3fs.go index 5b04348a55..1cf5b1f1c1 100644 --- a/util/pkg/vfs/s3fs.go +++ b/util/pkg/vfs/s3fs.go @@ -31,6 +31,8 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/aws/aws-sdk-go/aws/awserr" "github.com/aws/aws-sdk-go/service/s3" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "k8s.io/klog/v2" "k8s.io/kops/upup/pkg/fi/cloudup/terraformWriter" @@ -286,6 +288,9 @@ func (p *S3Path) getRequestACL(aclObj ACL) (*string, error) { } func (p *S3Path) WriteFile(ctx context.Context, data io.ReadSeeker, aclObj ACL) error { + ctx, span := tracer.Start(ctx, "S3Path::WriteFile", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + client, err := p.client(ctx) if err != nil { return err @@ -346,8 +351,11 @@ func (p *S3Path) CreateFile(ctx context.Context, data io.ReadSeeker, acl ACL) er // ReadFile implements Path::ReadFile func (p *S3Path) ReadFile(ctx context.Context) ([]byte, error) { + ctx, span := tracer.Start(ctx, "S3Path::ReadFile", trace.WithAttributes(attribute.String("path", p.String()))) + defer span.End() + var b bytes.Buffer - _, err := p.WriteTo(&b) + _, err := p.WriteToWithContext(ctx, &b) if err != nil { return nil, err } @@ -357,6 +365,11 @@ func (p *S3Path) ReadFile(ctx context.Context) ([]byte, error) { // WriteTo implements io.WriterTo func (p *S3Path) WriteTo(out io.Writer) (int64, error) { ctx := context.TODO() + return p.WriteToWithContext(ctx, out) +} + +// WriteToWithContext implements io.WriterTo, but adds a context +func (p *S3Path) WriteToWithContext(ctx context.Context, out io.Writer) (int64, error) { client, err := p.client(ctx) if err != nil { return 0, err @@ -433,6 +446,9 @@ func (p *S3Path) ReadDir() ([]Path, error) { } func (p *S3Path) ReadTree(ctx context.Context) ([]Path, error) { + ctx, span := tracer.Start(ctx, "S3Path::ReadTree") + defer span.End() + client, err := p.client(ctx) if err != nil { return nil, err @@ -484,7 +500,7 @@ func (p *S3Path) client(ctx context.Context) (*s3.S3, error) { return nil, err } - client, err := p.s3Context.getClient(bucketDetails.region) + client, err := p.s3Context.getClient(ctx, bucketDetails.region) if err != nil { return nil, err }