otel tools: basic trace server for jaeger

This allows us to explore a trace file in jaeger.
This commit is contained in:
justinsb 2023-10-14 15:12:14 -04:00
parent e0a79e1bd1
commit 831352fbe7
3 changed files with 573 additions and 0 deletions

View File

@ -0,0 +1,42 @@
# Copyright 2023 The Kubernetes Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
.PHONY: protobuf
protobuf:
protoc --go_out=pb/ --go_opt=paths=source_relative --go-grpc_out=pb --go-grpc_opt=paths=source_relative \
-I pb/ \
pb/jaeger/api/v2/*.proto pb/jaeger/storage/v1/*.proto
go run golang.org/x/tools/cmd/goimports@latest -w ./pb
.PHONY: wget
wget:
mkdir -p pb/jaeger/api/v2/
wget -O pb/jaeger/api/v2/model.proto https://raw.githubusercontent.com/jaegertracing/jaeger-idl/main/proto/api_v2/model.proto
mkdir -p pb/jaeger/storage/v1/
wget -O pb/jaeger/storage/v1/storage.proto https://raw.githubusercontent.com/jaegertracing/jaeger/main/plugin/storage/grpc/proto/storage.proto
# Remove gogoproto
sed -i '/gogoproto/d' pb/jaeger/api/v2/model.proto
sed -i 's@go_package@go_package="k8s.io/kops/tools/otel/traceserver/pb/jaeger/api/v2"; //@g' pb/jaeger/api/v2/model.proto
sed -i '/gogoproto/d' pb/jaeger/storage/v1/storage.proto
sed -i 's@go_package@go_package="k8s.io/kops/tools/otel/traceserver/pb/jaeger/storage/v1"; //@g' pb/jaeger/storage/v1/storage.proto
sed -i 's@import "model.proto"@import "jaeger/api/v2/model.proto"@g' pb/jaeger/storage/v1/storage.proto
# Remove empty [ ] directives (which span lines)
cat pb/jaeger/api/v2/model.proto | \
tr '\n' '~' | sed 's/\[\~[ ]*\]//g' | \
tr '~' '\n' > pb/jaeger/api/v2/model.proto.out
mv pb/jaeger/api/v2/model.proto.out pb/jaeger/api/v2/model.proto
cat pb/jaeger/storage/v1/storage.proto | \
tr '\n' '~' | sed 's/\[\~[ ]*\]//g' | \
tr '~' '\n' > pb/jaeger/storage/v1/storage.proto.out
mv pb/jaeger/storage/v1/storage.proto.out pb/jaeger/storage/v1/storage.proto

View File

@ -0,0 +1,22 @@
module k8s.io/kops/tools/otel/traceserver
go 1.21
require (
go.opentelemetry.io/proto/otlp v1.0.0
google.golang.org/grpc v1.58.3
google.golang.org/protobuf v1.31.0
k8s.io/klog/v2 v2.100.1
)
require (
github.com/go-logr/logr v1.2.4 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
golang.org/x/net v0.17.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20231002182017-d307bd883b97 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20231009173412-8bfb1ae86b6c // indirect
)

View File

@ -0,0 +1,509 @@
/*
Copyright 2023 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package main
import (
"bytes"
"context"
"encoding/binary"
"flag"
"fmt"
"io"
"log"
"net"
"os"
"os/exec"
"strings"
coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
v11 "go.opentelemetry.io/proto/otlp/common/v1"
v1 "go.opentelemetry.io/proto/otlp/trace/v1"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/encoding/prototext"
"google.golang.org/protobuf/proto"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"
"k8s.io/klog/v2"
v2 "k8s.io/kops/tools/otel/traceserver/pb/jaeger/api/v2"
storagev1 "k8s.io/kops/tools/otel/traceserver/pb/jaeger/storage/v1"
)
func main() {
if err := run(context.Background()); err != nil {
fmt.Fprintf(os.Stderr, "%v\n", err)
os.Exit(1)
}
}
func run(ctx context.Context) error {
listen := "127.0.0.1:12345"
run := ""
src := ""
flag.StringVar(&src, "src", src, "tracefile to load")
flag.StringVar(&listen, "listen", listen, "endpoint on which to serve grpc")
flag.StringVar(&run, "run", run, "visualization program to run [jaeger]")
klog.InitFlags(nil)
flag.Parse()
if src == "" {
return fmt.Errorf("--src is required")
}
if run != "" {
switch run {
case "jaeger":
go func() {
opt := RunJaegerOptions{
StorageServer: listen,
}
err := runJaeger(ctx, opt)
if err != nil {
klog.Warningf("error starting jaeger: %w", err)
}
}()
default:
return fmt.Errorf("run=%q not known (valid values: jaeger)", run)
}
}
traceFilePath := src
traceFile, err := ReadTraceFile(traceFilePath)
if err != nil {
return fmt.Errorf("reading %q: %w", traceFilePath, err)
}
lis, err := net.Listen("tcp", listen)
if err != nil {
return fmt.Errorf("listening on %q: %w", listen, err)
}
s := &Server{
traceFiles: []*TraceFile{traceFile},
}
grpcServer := grpc.NewServer()
storagev1.RegisterPluginCapabilitiesServer(grpcServer, s)
storagev1.RegisterSpanReaderPluginServer(grpcServer, s)
storagev1.RegisterDependenciesReaderPluginServer(grpcServer, s)
log.Printf("server listening at %v", lis.Addr())
if err := grpcServer.Serve(lis); err != nil {
return fmt.Errorf("serving %q: %w", listen, err)
}
return nil
}
type Server struct {
traceFiles []*TraceFile
storagev1.UnimplementedPluginCapabilitiesServer
storagev1.UnimplementedSpanReaderPluginServer
storagev1.UnimplementedDependenciesReaderPluginServer
}
func (s *Server) Capabilities(ctx context.Context, req *storagev1.CapabilitiesRequest) (*storagev1.CapabilitiesResponse, error) {
klog.V(2).Infof("Capabilities %v", prototext.Format(req))
response := &storagev1.CapabilitiesResponse{
ArchiveSpanReader: false,
ArchiveSpanWriter: false,
StreamingSpanWriter: false,
}
klog.V(4).Infof("<-Capabilities %v", prototext.Format(response))
return response, nil
}
func (s *Server) GetTrace(req *storagev1.GetTraceRequest, stream storagev1.SpanReaderPlugin_GetTraceServer) error {
ctx := stream.Context()
klog.V(2).Infof("GetTrace %v", prototext.Format(req))
var opt FilterOptions
return s.visitSpans(ctx, opt, func(serviceName string, resourceSpans *v1.ResourceSpans) error {
chunk := &storagev1.SpansResponseChunk{}
for _, scopeSpan := range resourceSpans.ScopeSpans {
for _, span := range scopeSpan.Spans {
if !bytes.Equal(span.TraceId, req.TraceId) {
continue
}
out := convertToJaeger(serviceName, span)
chunk.Spans = append(chunk.Spans, out)
}
}
klog.V(4).Infof("<-GetTrace %v", prototext.Format(chunk))
if err := stream.Send(chunk); err != nil {
klog.Warningf("error sending chunk: %w", err)
return err
}
return nil
})
}
func (s *Server) GetServices(ctx context.Context, req *storagev1.GetServicesRequest) (*storagev1.GetServicesResponse, error) {
klog.V(2).Infof("GetServices %v", prototext.Format(req))
services := make(map[string]struct{})
for _, traceFile := range s.traceFiles {
for _, data := range traceFile.data {
// klog.Infof("data %v", prototext.Format(data))
for _, span := range data.ResourceSpans {
if span.Resource != nil {
for _, attr := range span.Resource.Attributes {
if attr.GetKey() == "service.name" {
serviceName := attr.GetValue().GetStringValue()
if serviceName != "" {
services[serviceName] = struct{}{}
}
}
}
}
}
}
}
response := &storagev1.GetServicesResponse{}
for k := range services {
response.Services = append(response.Services, k)
}
klog.V(4).Infof("<-GetServices %v", prototext.Format(response))
return response, nil
}
func (s *Server) GetOperations(ctx context.Context, req *storagev1.GetOperationsRequest) (*storagev1.GetOperationsResponse, error) {
klog.V(2).Infof("GetOperations %v", prototext.Format(req))
var opt FilterOptions
opt.ServiceName = req.GetService()
operations := make(map[string]struct{})
if err := s.visitSpans(ctx, opt, func(serviceName string, resourceSpans *v1.ResourceSpans) error {
for _, scopeSpan := range resourceSpans.ScopeSpans {
for _, span := range scopeSpan.Spans {
operations[span.Name] = struct{}{}
}
}
return nil
}); err != nil {
return nil, err
}
response := &storagev1.GetOperationsResponse{}
for operation := range operations {
response.Operations = append(response.Operations, &storagev1.Operation{
Name: operation,
SpanKind: "client", // TODO: How do we know?
})
}
klog.V(4).Infof("<-GetOperations %v", prototext.Format(response))
return response, nil
}
func (s *Server) FindTraces(req *storagev1.FindTracesRequest, stream storagev1.SpanReaderPlugin_FindTracesServer) error {
ctx := stream.Context()
klog.V(2).Infof("FindTraces %v", prototext.Format(req))
var opt FilterOptions
opt.ServiceName = req.GetQuery().GetServiceName()
// Note that we must return these in order of traceid, because that is what jaeger expects:
// https://github.com/jaegertracing/jaeger/blob/7aeb457c21eed28b58cd34021eff14727229aa69/plugin/storage/grpc/shared/grpc_client.go#L201-L217
traces := make(map[string]*storagev1.SpansResponseChunk)
if err := s.visitSpans(ctx, opt, func(serviceName string, resourceSpans *v1.ResourceSpans) error {
for _, scopeSpan := range resourceSpans.ScopeSpans {
for _, span := range scopeSpan.Spans {
if operationName := req.GetQuery().GetOperationName(); operationName != "" {
if operationName != span.Name {
continue
}
}
out := convertToJaeger(serviceName, span)
key := string(span.TraceId)
trace := traces[key]
if trace == nil {
trace = &storagev1.SpansResponseChunk{}
traces[key] = trace
}
trace.Spans = append(trace.Spans, out)
}
}
return nil
}); err != nil {
return err
}
for _, trace := range traces {
chunk := trace
klog.V(4).Infof("<-FindTraces %v", prototext.Format(chunk))
if err := stream.Send(chunk); err != nil {
return err
}
}
return nil
}
func (s *Server) FindTraceIDs(ctx context.Context, req *storagev1.FindTraceIDsRequest) (*storagev1.FindTraceIDsResponse, error) {
klog.Warningf("FindTraceIDs not implemented: %v", prototext.Format(req))
return nil, status.Errorf(codes.Unimplemented, "method FindTraceIDs not implemented")
}
func (s *Server) GetDependencies(ctx context.Context, req *storagev1.GetDependenciesRequest) (*storagev1.GetDependenciesResponse, error) {
klog.Warningf("GetDependencies not implemented: %v", prototext.Format(req))
return nil, status.Errorf(codes.Unimplemented, "method GetDependencies not implemented")
}
type TraceFile struct {
data []*coltracepb.ExportTraceServiceRequest
}
type FilterOptions struct {
ServiceName string
}
func (s *Server) visitSpans(ctx context.Context, opt FilterOptions, callback func(string, *v1.ResourceSpans) error) error {
for _, traceFile := range s.traceFiles {
if err := traceFile.visitSpans(ctx, opt, callback); err != nil {
return err
}
}
return nil
}
func (f *TraceFile) visitSpans(ctx context.Context, opt FilterOptions, callback func(string, *v1.ResourceSpans) error) error {
for _, data := range f.data {
// klog.Infof("data %v", prototext.Format(data))
for _, span := range data.ResourceSpans {
serviceName := ""
for _, attr := range span.GetResource().GetAttributes() {
if attr.GetKey() == "service.name" {
serviceName = attr.GetValue().GetStringValue()
break
}
}
if opt.ServiceName != "" && serviceName != opt.ServiceName {
continue
}
if err := callback(serviceName, span); err != nil {
return err
}
}
}
return nil
}
func ReadTraceFile(p string) (*TraceFile, error) {
out := &TraceFile{}
r, err := os.Open(p)
if err != nil {
return nil, fmt.Errorf("opening: %w", err)
}
defer r.Close()
for {
header := make([]byte, 16)
if _, err := io.ReadFull(r, header); err != nil {
if err == io.EOF {
break
}
return nil, fmt.Errorf("reading header: %w", err)
}
payloadLength := binary.BigEndian.Uint32(header[0:4])
//checksum := binary.BigEndian.Uint32(header[4:8])
flags := binary.BigEndian.Uint32(header[8:12])
typeCode := binary.BigEndian.Uint32(header[12:16])
// TODO: Verify checksum
if flags != 0 {
return nil, fmt.Errorf("unexpected flags value %v", flags)
}
// TODO: Sanity-check payloadLength
payload := make([]byte, payloadLength)
if _, err := io.ReadFull(r, payload); err != nil {
return nil, fmt.Errorf("reading payload: %w", err)
}
// TODO: Better typeCode parsing
if typeCode == 1 {
// TODO: process type definitions
} else if typeCode == 32 {
obj := &coltracepb.ExportTraceServiceRequest{}
if err := proto.Unmarshal(payload, obj); err != nil {
return nil, fmt.Errorf("parsing ExportTraceServiceRequest: %w", err)
}
out.data = append(out.data, obj)
} else {
return nil, fmt.Errorf("unexpected typecode ")
}
}
return out, nil
}
func startTimeForSpan(span *v1.Span) *timestamppb.Timestamp {
nanos := span.StartTimeUnixNano
if nanos == 0 {
return nil
}
secs := nanos / 1e9
nanos -= secs * 1e9
return &timestamppb.Timestamp{
Seconds: int64(secs),
Nanos: int32(nanos),
}
}
func durationForSpan(span *v1.Span) *durationpb.Duration {
if span.EndTimeUnixNano == 0 || span.StartTimeUnixNano == 0 {
return nil
}
nanos := span.EndTimeUnixNano - span.StartTimeUnixNano
secs := nanos / 1e9
nanos -= secs * 1e9
return &durationpb.Duration{Seconds: int64(secs), Nanos: int32(nanos)}
}
func convertToJaeger(serviceName string, span *v1.Span) *v2.Span {
out := &v2.Span{}
out.TraceId = span.TraceId
out.SpanId = span.SpanId
out.OperationName = span.Name
out.StartTime = startTimeForSpan(span)
out.Duration = durationForSpan(span)
out.Process = &v2.Process{
ServiceName: serviceName,
}
if span.ParentSpanId != nil {
out.References = append(out.References, &v2.SpanRef{
TraceId: span.TraceId,
SpanId: span.ParentSpanId,
RefType: v2.SpanRefType_CHILD_OF,
})
}
// References []*SpanRef `protobuf:"bytes,4,rep,name=references,proto3" json:"references,omitempty"`
// Flags uint32 `protobuf:"varint,5,opt,name=flags,proto3" json:"flags,omitempty"`
// Tags []*KeyValue `protobuf:"bytes,8,rep,name=tags,proto3" json:"tags,omitempty"`
// Logs []*Log `protobuf:"bytes,9,rep,name=logs,proto3" json:"logs,omitempty"`
// Process *Process `protobuf:"bytes,10,opt,name=process,proto3" json:"process,omitempty"`
// ProcessId string `protobuf:"bytes,11,opt,name=process_id,json=processId,proto3" json:"process_id,omitempty"`
// Warnings []string `protobuf:"bytes,12,rep,name=warnings,proto3" json:"warnings,omitempty"`
for _, attr := range span.Attributes {
tag := &v2.KeyValue{
Key: attr.GetKey(),
}
switch v := attr.GetValue().Value.(type) {
case *v11.AnyValue_StringValue:
tag.VStr = v.StringValue
case *v11.AnyValue_IntValue:
tag.VInt64 = v.IntValue
case *v11.AnyValue_ArrayValue:
s, err := attributeValueAsString(attr.GetValue())
if err != nil {
klog.Warningf("error converting array value: %v", err)
s = "<?error>"
}
tag.VStr = s
default:
klog.Warningf("unhandled attribute type %T", v)
}
out.Tags = append(out.Tags, tag)
}
return out
}
func attributeValueAsString(v *v11.AnyValue) (string, error) {
switch v := v.Value.(type) {
case *v11.AnyValue_StringValue:
return v.StringValue, nil
case *v11.AnyValue_ArrayValue:
var values []string
for _, a := range v.ArrayValue.GetValues() {
s, err := attributeValueAsString(a)
if err != nil {
klog.Warningf("error converting array value: %v", err)
s = "<?error>"
}
values = append(values, s)
}
return "[" + strings.Join(values, ",") + "]", nil
default:
return "", fmt.Errorf("unhandled attribute type %T", v)
}
}
// RunJaegerOptions are the options for runJaeger
type RunJaegerOptions struct {
StorageServer string
}
// runJaeger starts the jaeger query & visualizer, binding to our storage server
func runJaeger(ctx context.Context, opt RunJaegerOptions) error {
jaegerURL := "http://127.0.0.1:16686/"
var jaeger *exec.Cmd
{
klog.Infof("starting jaeger")
args := []string{
"docker", "run", "--rm", "--network=host", "--name=jaeger",
"-e=SPAN_STORAGE_TYPE=grpc-plugin",
"jaegertracing/jaeger-query",
"--grpc-storage.server=" + opt.StorageServer,
}
c := exec.CommandContext(ctx, args[0], args[1:]...)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Start(); err != nil {
return fmt.Errorf("starting jaeger in docker (%s): %w", strings.Join(args, " "), err)
}
jaeger = c
}
{
fmt.Fprintf(os.Stdout, "open browser to %s\n", jaegerURL)
args := []string{"xdg-open", jaegerURL}
c := exec.CommandContext(ctx, args[0], args[1:]...)
c.Stdout = os.Stdout
c.Stderr = os.Stderr
if err := c.Run(); err != nil {
return fmt.Errorf("opening webbrowser (%s): %w", strings.Join(args, " "), err)
}
}
if err := jaeger.Wait(); err != nil {
return fmt.Errorf("waiting for jaeger to exit: %w", err)
}
return nil
}