Add instrumentation for Kafka (#134)

This commit is contained in:
Sam Xie 2020-07-23 12:04:30 +08:00 committed by GitHub
parent bb438f8907
commit 70957fc44e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
16 changed files with 1631 additions and 26 deletions

View File

@ -49,3 +49,7 @@ updates:
directory: "/instrumentation/runtime" # Location of package manifests
schedule:
interval: "daily"
- package-ecosystem: "gomod" # See documentation for possible values
directory: "/instrumentation/github.com/Shopify/sarama" # Location of package manifests
schedule:
interval: "daily"

View File

@ -8,6 +8,11 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
## [Unreleased]
### Added
- Add instrumentation for Kafka (github.com/Shopify/sarama). (#134)
- Add links and status message for mock span. (#134)
## [0.9.0] - 2020-07-20
This release upgrades its [go.opentelemetry.io/otel](https://github.com/open-telemetry/opentelemetry-go/releases/tag/v0.9.0) dependency to v0.9.0.

View File

@ -0,0 +1,111 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"context"
"strconv"
"github.com/Shopify/sarama"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)
type partitionConsumer struct {
sarama.PartitionConsumer
messages chan *sarama.ConsumerMessage
}
// Messages returns the read channel for the messages that are returned by
// the broker.
func (pc *partitionConsumer) Messages() <-chan *sarama.ConsumerMessage {
return pc.messages
}
// WrapPartitionConsumer wraps a sarama.PartitionConsumer causing each received
// message to be traced.
func WrapPartitionConsumer(serviceName string, pc sarama.PartitionConsumer, opts ...Option) sarama.PartitionConsumer {
cfg := newConfig(serviceName, opts...)
wrapped := &partitionConsumer{
PartitionConsumer: pc,
messages: make(chan *sarama.ConsumerMessage),
}
go func() {
msgs := pc.Messages()
for msg := range msgs {
// Extract a span context from message to link.
carrier := NewConsumerMessageCarrier(msg)
parentSpanContext := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)
// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String(strconv.FormatInt(msg.Offset, 10)),
kafkaPartitionKey.Int32(msg.Partition),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindConsumer),
}
newCtx, span := cfg.Tracer.Start(parentSpanContext, "kafka.consume", opts...)
// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(newCtx, cfg.Propagators, carrier)
// Send messages back to user.
wrapped.messages <- msg
span.End()
}
close(wrapped.messages)
}()
return wrapped
}
type consumer struct {
sarama.Consumer
serviceName string
opts []Option
}
// ConsumePartition invokes Consumer.ConsumePartition and wraps the resulting
// PartitionConsumer.
func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (sarama.PartitionConsumer, error) {
pc, err := c.Consumer.ConsumePartition(topic, partition, offset)
if err != nil {
return nil, err
}
return WrapPartitionConsumer(c.serviceName, pc, c.opts...), nil
}
// WrapConsumer wraps a sarama.Consumer wrapping any PartitionConsumer created
// via Consumer.ConsumePartition.
func WrapConsumer(serviceName string, c sarama.Consumer, opts ...Option) sarama.Consumer {
return &consumer{
Consumer: c,
serviceName: serviceName,
opts: opts,
}
}

View File

@ -0,0 +1,211 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"context"
"fmt"
"testing"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
mocktracer "go.opentelemetry.io/contrib/internal/trace"
)
const (
serviceName = "test-service-name"
topic = "test-topic"
)
var (
propagators = global.Propagators()
)
func TestWrapPartitionConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock partition consumer controller
consumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)
// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)
partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}
func TestWrapConsumer(t *testing.T) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockPartitionConsumer := mockConsumer.ExpectConsumePartition(topic, 0, 0)
// Wrap consumer
consumer := WrapConsumer(serviceName, mockConsumer, WithTracer(mt))
// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(t, err)
consumeAndCheck(t, mt, mockPartitionConsumer, partitionConsumer)
}
func consumeAndCheck(t *testing.T, mt *mocktracer.Tracer, mockPartitionConsumer *mocks.PartitionConsumer, partitionConsumer sarama.PartitionConsumer) {
// Create message with span context
ctx, _ := mt.Start(context.Background(), "")
message := sarama.ConsumerMessage{Key: []byte("foo")}
propagation.InjectHTTP(ctx, propagators, NewConsumerMessageCarrier(&message))
// Produce message
mockPartitionConsumer.YieldMessage(&message)
mockPartitionConsumer.YieldMessage(&sarama.ConsumerMessage{Key: []byte("foo2")})
// Consume messages
msgList := make([]*sarama.ConsumerMessage, 2)
msgList[0] = <-partitionConsumer.Messages()
msgList[1] = <-partitionConsumer.Messages()
require.NoError(t, partitionConsumer.Close())
// Wait for the channel to be closed
<-partitionConsumer.Messages()
// Check spans length
spans := mt.EndedSpans()
assert.Len(t, spans, 2)
expectedList := []struct {
kvList []kv.KeyValue
parentSpanID trace.SpanID
kind trace.SpanKind
msgKey []byte
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String("1"),
kafkaPartitionKey.Int32(0),
},
parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID,
kind: trace.SpanKindConsumer,
msgKey: []byte("foo"),
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String("test-topic"),
standard.MessagingOperationReceive,
standard.MessagingMessageIDKey.String("2"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindConsumer,
msgKey: []byte("foo2"),
},
}
for i, expected := range expectedList {
t.Run(fmt.Sprint("index", i), func(t *testing.T) {
span := spans[i]
assert.Equal(t, expected.parentSpanID, span.ParentSpanID)
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewConsumerMessageCarrier(msgList[i])))
assert.Equal(t, span.SpanContext(), remoteSpanFromMessage,
"span context should be injected into the consumer message headers")
assert.Equal(t, "kafka.consume", span.Name)
assert.Equal(t, expected.kind, span.Kind)
assert.Equal(t, expected.msgKey, msgList[i].Key)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
})
}
}
func TestConsumerConsumePartitionWithError(t *testing.T) {
// Mock partition consumer controller
mockConsumer := mocks.NewConsumer(t, sarama.NewConfig())
mockConsumer.ExpectConsumePartition(topic, 0, 0)
consumer := WrapConsumer(serviceName, mockConsumer)
_, err := consumer.ConsumePartition(topic, 0, 0)
assert.NoError(t, err)
// Consume twice
_, err = consumer.ConsumePartition(topic, 0, 0)
assert.Error(t, err)
}
func BenchmarkWrapPartitionConsumer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)
partitionConsumer = WrapPartitionConsumer(serviceName, partitionConsumer, WithTracer(mt))
message := sarama.ConsumerMessage{Key: []byte("foo")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mockPartitionConsumer.YieldMessage(&message)
<-partitionConsumer.Messages()
}
}
func BenchmarkMockPartitionConsumer(b *testing.B) {
mockPartitionConsumer, partitionConsumer := createMockPartitionConsumer(b)
message := sarama.ConsumerMessage{Key: []byte("foo")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mockPartitionConsumer.YieldMessage(&message)
<-partitionConsumer.Messages()
}
}
func createMockPartitionConsumer(b *testing.B) (*mocks.PartitionConsumer, sarama.PartitionConsumer) {
// Mock partition consumer controller
consumer := mocks.NewConsumer(b, sarama.NewConfig())
mockPartitionConsumer := consumer.ExpectConsumePartition(topic, 0, 0)
// Create partition consumer
partitionConsumer, err := consumer.ConsumePartition(topic, 0, 0)
require.NoError(b, err)
return mockPartitionConsumer, partitionConsumer
}

View File

@ -0,0 +1,23 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// Package sarama provides functions to trace the Shopify/sarama package. (https://github.com/Shopify/sarama)
//
// The consumer's span will be created as a child of the producer's span.
//
// Context propagation only works on Kafka versions higher than 0.11.0.0 which supports record headers.
// (https://archive.apache.org/dist/kafka/0.11.0.0/RELEASE_NOTES.html)
//
// Based on: https://github.com/DataDog/dd-trace-go/tree/v1/contrib/Shopify/sarama
package sarama // import "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"

View File

@ -0,0 +1,13 @@
module go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama
go 1.14
replace go.opentelemetry.io/contrib => ../../../..
require (
github.com/Shopify/sarama v1.26.4
github.com/stretchr/testify v1.6.1
go.opentelemetry.io/contrib v0.9.0
go.opentelemetry.io/otel v0.9.0
google.golang.org/grpc v1.30.0
)

View File

@ -0,0 +1,150 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7 h1:qELHH0AWCvf98Yf+CNIJx9vOZOfHFDDzgDRYsnNk/vs=
github.com/DataDog/sketches-go v0.0.0-20190923095040-43f19ad77ff7/go.mod h1:Q5DbzQ+3AkgGwymQO7aZFNP7ns2lZKGtvRBzRXfdi60=
github.com/Shopify/sarama v1.26.4 h1:+17TxUq/PJEAfZAll0T7XJjSgQWCpaQSoki/x5yN8o8=
github.com/Shopify/sarama v1.26.4/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
github.com/Shopify/toxiproxy v2.1.4+incompatible h1:TKdv8HiTLgE5wdJuEML90aBgNWsokNbMijUGhmcoBJc=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/benbjohnson/clock v1.0.3 h1:vkLuvpK4fmtSCuo60+yC63p7y0BmQ8gm5ZXGuBCJyXg=
github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiUKYMaxlTDM=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/eapache/go-resiliency v1.2.0 h1:v7g92e/KSN71Rq7vSThKaWIq68fL4YHvWyiUKorFR1Q=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0 h1:YOEu7KNc61ntiQlcEeUIoDTJ2o8mQznoNvUhiigpIqc=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.7.2 h1:2QxQoC1TS09S7fhCPsrvqYdvP1H5M1P1ih5ABm3BTYk=
github.com/frankban/quicktest v1.7.2/go.mod h1:jaStnuzAqU1AJdCO0l53JDCJrVDKcS03DbaAcR7Ks/o=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0 h1:/QaMHBdZ26BB3SSst0Iwl10Epc+xhTquomWX0oZEB6w=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hashicorp/go-uuid v1.0.2 h1:cfejS+Tpcp13yd5nYHWDI6qVCny6wyX2Mt5SGur2IGE=
github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro=
github.com/jcmturner/gofork v1.0.0 h1:J7uCkflzTEhUZ64xqKnkDxq3kzc96ajM1Gli5ktUem8=
github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o=
github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA=
github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/opentracing/opentracing-go v1.2.0/go.mod h1:GxEUsuufX4nBwe+T+Wl9TAgYrxe9dPLANfrWvHYVTgc=
github.com/pierrec/lz4 v2.4.1+incompatible h1:mFe7ttWaflA46Mhqh+jUfjp2qTbPYxLB2/OyBppH9dg=
github.com/pierrec/lz4 v2.4.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563 h1:dY6ETXrvDG7Sa4vE8ZQG4yqWg6UnOcbqTAahkV813vQ=
github.com/rcrowley/go-metrics v0.0.0-20190826022208-cac0b30c2563/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I=
github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y=
go.opentelemetry.io/otel v0.9.0 h1:nsdCDHzQx1Yv8E2nwCPcMXMfg+EMIlx1LBOXNC8qSQ8=
go.opentelemetry.io/otel v0.9.0/go.mod h1:ckxzUEfk7tAkTwEMVdkllBM+YOfE/K9iwg6zYntFYSg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72 h1:+ELyKg6m8UBf0nPFSqD0mi7zUfwPyXo23HNjMnXPz7w=
golang.org/x/crypto v0.0.0-20200204104054-c9f3fb736b72/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2 h1:CCH4IOTTfewWjGOlSp+zGcjutRKlBEZQ6wTn8ozI/nI=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03 h1:4HYDjxeNXAOTv3o1N2tjo8UUSlhQgAD52FVkwxnWgM8=
google.golang.org/genproto v0.0.0-20191009194640-548a555dbc03/go.mod h1:n3cpQtvxv34hfy77yVDNjmbRyujviMdxYliBSkLhpCc=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.30.0 h1:M5a8xTlYTxwMn5ZFkwhRabsygDY5G8TYLyQDBxJNAxE=
google.golang.org/grpc v1.30.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/jcmturner/aescts.v1 v1.0.1 h1:cVVZBK2b1zY26haWB4vbBiZrfFQnfbTVrE3xZq6hrEw=
gopkg.in/jcmturner/aescts.v1 v1.0.1/go.mod h1:nsR8qBOg+OucoIW+WMhB3GspUQXq9XorLnQb9XtvcOo=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1 h1:cIuC1OLRGZrld+16ZJvvZxVJeKPsvd5eUIvxfoN5hSM=
gopkg.in/jcmturner/dnsutils.v1 v1.0.1/go.mod h1:m3v+5svpVOhtFAP/wSz+yzh4Mc0Fg7eRhxkJMWSIz9Q=
gopkg.in/jcmturner/goidentity.v3 v3.0.0 h1:1duIyWiTaYvVx3YX2CYtpJbUFd7/UuPYCfgXtQ3VTbI=
gopkg.in/jcmturner/goidentity.v3 v3.0.0/go.mod h1:oG2kH0IvSYNIu80dVAyu/yoefjq1mNfM5bm88whjWx4=
gopkg.in/jcmturner/gokrb5.v7 v7.5.0 h1:a9tsXlIDD9SKxotJMK3niV7rPZAJeX2aD/0yg3qlIrg=
gopkg.in/jcmturner/gokrb5.v7 v7.5.0/go.mod h1:l8VISx+WGYp+Fp7KRbsiUuXTTOnxIc3Tuvyavf11/WM=
gopkg.in/jcmturner/rpc.v1 v1.1.0 h1:QHIUxTX1ISuAv9dD2wJ9HWQVuWDX/Zc0PfeC2tjc4rU=
gopkg.in/jcmturner/rpc.v1 v1.1.0/go.mod h1:YIdkC4XfD6GXbzje11McwsDuOlZQSb9W4vfLvuNnlv8=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View File

@ -0,0 +1,94 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"github.com/Shopify/sarama"
"go.opentelemetry.io/otel/api/propagation"
)
var _ propagation.HTTPSupplier = (*ProducerMessageCarrier)(nil)
var _ propagation.HTTPSupplier = (*ConsumerMessageCarrier)(nil)
// ProducerMessageCarrier injects and extracts traces from a sarama.ProducerMessage.
type ProducerMessageCarrier struct {
msg *sarama.ProducerMessage
}
// NewProducerMessageCarrier creates a new ProducerMessageCarrier.
func NewProducerMessageCarrier(msg *sarama.ProducerMessage) ProducerMessageCarrier {
return ProducerMessageCarrier{msg: msg}
}
// Get retrieves a single value for a given key.
func (c ProducerMessageCarrier) Get(key string) string {
for _, h := range c.msg.Headers {
if string(h.Key) == key {
return string(h.Value)
}
}
return ""
}
// Set sets a header.
func (c ProducerMessageCarrier) Set(key, val string) {
// Ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, sarama.RecordHeader{
Key: []byte(key),
Value: []byte(val),
})
}
// ConsumerMessageCarrier injects and extracts traces from a sarama.ConsumerMessage.
type ConsumerMessageCarrier struct {
msg *sarama.ConsumerMessage
}
// NewConsumerMessageCarrier creates a new ConsumerMessageCarrier.
func NewConsumerMessageCarrier(msg *sarama.ConsumerMessage) ConsumerMessageCarrier {
return ConsumerMessageCarrier{msg: msg}
}
// Get retrieves a single value for a given key.
func (c ConsumerMessageCarrier) Get(key string) string {
for _, h := range c.msg.Headers {
if h != nil && string(h.Key) == key {
return string(h.Value)
}
}
return ""
}
// Set sets a header.
func (c ConsumerMessageCarrier) Set(key, val string) {
// Ensure uniqueness of keys
for i := 0; i < len(c.msg.Headers); i++ {
if c.msg.Headers[i] != nil && string(c.msg.Headers[i].Key) == key {
c.msg.Headers = append(c.msg.Headers[:i], c.msg.Headers[i+1:]...)
i--
}
}
c.msg.Headers = append(c.msg.Headers, &sarama.RecordHeader{
Key: []byte(key),
Value: []byte(val),
})
}

View File

@ -0,0 +1,120 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"testing"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
)
func TestProducerMessageCarrierGet(t *testing.T) {
testCases := []struct {
name string
carrier ProducerMessageCarrier
key string
expected string
}{
{
name: "exists",
carrier: ProducerMessageCarrier{msg: &sarama.ProducerMessage{Headers: []sarama.RecordHeader{
{Key: []byte("foo"), Value: []byte("bar")},
}}},
key: "foo",
expected: "bar",
},
{
name: "not exists",
carrier: ProducerMessageCarrier{msg: &sarama.ProducerMessage{Headers: []sarama.RecordHeader{}}},
key: "foo",
expected: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.carrier.Get(tc.key)
assert.Equal(t, tc.expected, result)
})
}
}
func TestProducerMessageCarrierSet(t *testing.T) {
msg := sarama.ProducerMessage{Headers: []sarama.RecordHeader{
{Key: []byte("foo"), Value: []byte("bar")},
}}
carrier := ProducerMessageCarrier{msg: &msg}
carrier.Set("foo", "bar2")
carrier.Set("foo2", "bar2")
carrier.Set("foo2", "bar3")
carrier.Set("foo3", "bar4")
assert.ElementsMatch(t, carrier.msg.Headers, []sarama.RecordHeader{
{Key: []byte("foo"), Value: []byte("bar2")},
{Key: []byte("foo2"), Value: []byte("bar3")},
{Key: []byte("foo3"), Value: []byte("bar4")},
})
}
func TestConsumerMessageCarrierGet(t *testing.T) {
testCases := []struct {
name string
carrier ConsumerMessageCarrier
key string
expected string
}{
{
name: "exists",
carrier: ConsumerMessageCarrier{msg: &sarama.ConsumerMessage{Headers: []*sarama.RecordHeader{
{Key: []byte("foo"), Value: []byte("bar")},
}}},
key: "foo",
expected: "bar",
},
{
name: "not exists",
carrier: ConsumerMessageCarrier{msg: &sarama.ConsumerMessage{Headers: []*sarama.RecordHeader{}}},
key: "foo",
expected: "",
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := tc.carrier.Get(tc.key)
assert.Equal(t, tc.expected, result)
})
}
}
func TestConsumerMessageCarrierSet(t *testing.T) {
msg := sarama.ConsumerMessage{Headers: []*sarama.RecordHeader{
{Key: []byte("foo"), Value: []byte("bar")},
}}
carrier := ConsumerMessageCarrier{msg: &msg}
carrier.Set("foo", "bar2")
carrier.Set("foo2", "bar2")
carrier.Set("foo2", "bar3")
carrier.Set("foo3", "bar4")
assert.ElementsMatch(t, carrier.msg.Headers, []*sarama.RecordHeader{
{Key: []byte("foo"), Value: []byte("bar2")},
{Key: []byte("foo2"), Value: []byte("bar3")},
{Key: []byte("foo3"), Value: []byte("bar4")},
})
}

View File

@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"go.opentelemetry.io/otel/api/global"
"go.opentelemetry.io/otel/api/kv"
otelpropagation "go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/trace"
)
const (
defaultTracerName = "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
kafkaPartitionKey = kv.Key("messaging.kafka.partition")
)
type config struct {
ServiceName string
Tracer trace.Tracer
Propagators otelpropagation.Propagators
}
// newConfig returns a config with all Options set.
func newConfig(serviceName string, opts ...Option) config {
cfg := config{Propagators: global.Propagators(), ServiceName: serviceName}
for _, opt := range opts {
opt(&cfg)
}
if cfg.Tracer == nil {
cfg.Tracer = global.Tracer(defaultTracerName)
}
return cfg
}
// Option specifies instrumentation configuration options.
type Option func(*config)
// WithTracer specifies a tracer to use for creating spans. If none is
// specified, a tracer named
// "go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama"
// from the global provider is used.
func WithTracer(tracer trace.Tracer) Option {
return func(cfg *config) {
cfg.Tracer = tracer
}
}
// WithPropagators specifies propagators to use for extracting
// information from the HTTP requests. If none are specified, global
// ones will be used.
func WithPropagators(propagators otelpropagation.Propagators) Option {
return func(cfg *config) {
cfg.Propagators = propagators
}
}

View File

@ -0,0 +1,73 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"testing"
"github.com/stretchr/testify/assert"
"go.opentelemetry.io/otel/api/global"
)
func TestNewConfig(t *testing.T) {
testCases := []struct {
name string
serviceName string
opts []Option
expected config
}{
{
name: "set service name",
serviceName: serviceName,
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer(defaultTracerName),
Propagators: global.Propagators(),
},
},
{
name: "with tracer",
serviceName: serviceName,
opts: []Option{
WithTracer(global.Tracer("new")),
},
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer("new"),
Propagators: global.Propagators(),
},
},
{
name: "with propagators",
serviceName: serviceName,
opts: []Option{
WithPropagators(nil),
},
expected: config{
ServiceName: serviceName,
Tracer: global.Tracer(defaultTracerName),
Propagators: nil,
},
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := newConfig(tc.serviceName, tc.opts...)
assert.Equal(t, tc.expected, result)
})
}
}

View File

@ -0,0 +1,265 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"context"
"strconv"
"github.com/Shopify/sarama"
"google.golang.org/grpc/codes"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
)
type syncProducer struct {
sarama.SyncProducer
cfg config
saramaConfig *sarama.Config
}
// SendMessage calls sarama.SyncProducer.SendMessage and traces the request.
func (p *syncProducer) SendMessage(msg *sarama.ProducerMessage) (partition int32, offset int64, err error) {
span := startProducerSpan(p.cfg, p.saramaConfig.Version, msg)
partition, offset, err = p.SyncProducer.SendMessage(msg)
finishProducerSpan(span, partition, offset, err)
return partition, offset, err
}
// SendMessages calls sarama.SyncProducer.SendMessages and traces the requests.
func (p *syncProducer) SendMessages(msgs []*sarama.ProducerMessage) error {
// Although there's only one call made to the SyncProducer, the messages are
// treated individually, so we create a span for each one
spans := make([]trace.Span, len(msgs))
for i, msg := range msgs {
spans[i] = startProducerSpan(p.cfg, p.saramaConfig.Version, msg)
}
err := p.SyncProducer.SendMessages(msgs)
for i, span := range spans {
finishProducerSpan(span, msgs[i].Partition, msgs[i].Offset, err)
}
return err
}
// WrapSyncProducer wraps a sarama.SyncProducer so that all produced messages
// are traced.
func WrapSyncProducer(serviceName string, saramaConfig *sarama.Config, producer sarama.SyncProducer, opts ...Option) sarama.SyncProducer {
cfg := newConfig(serviceName, opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
return &syncProducer{
SyncProducer: producer,
cfg: cfg,
saramaConfig: saramaConfig,
}
}
type closeType int
const (
closeSync closeType = iota
closeAsync
)
type asyncProducer struct {
sarama.AsyncProducer
input chan *sarama.ProducerMessage
successes chan *sarama.ProducerMessage
errors chan *sarama.ProducerError
closeErr chan error
}
// Input returns the input channel.
func (p *asyncProducer) Input() chan<- *sarama.ProducerMessage {
return p.input
}
// Successes returns the successes channel.
func (p *asyncProducer) Successes() <-chan *sarama.ProducerMessage {
return p.successes
}
// Errors returns the errors channel.
func (p *asyncProducer) Errors() <-chan *sarama.ProducerError {
return p.errors
}
// AsyncClose async close producer.
func (p *asyncProducer) AsyncClose() {
p.input <- &sarama.ProducerMessage{
Metadata: closeAsync,
}
}
// Close shuts down the producer and waits for any buffered messages to be
// flushed.
//
// Due to the implement of sarama, some messages may lose successes or errors status
// while closing.
func (p *asyncProducer) Close() error {
p.input <- &sarama.ProducerMessage{
Metadata: closeSync,
}
return <-p.closeErr
}
type producerMessageContext struct {
span trace.Span
metadataBackup interface{}
}
// WrapAsyncProducer wraps a sarama.AsyncProducer so that all produced messages
// are traced. It requires the underlying sarama Config so we can know whether
// or not successes will be returned.
//
// If `Return.Successes` is false, there is no way to know partition and offset of
// the message.
func WrapAsyncProducer(serviceName string, saramaConfig *sarama.Config, p sarama.AsyncProducer, opts ...Option) sarama.AsyncProducer {
cfg := newConfig(serviceName, opts...)
if saramaConfig == nil {
saramaConfig = sarama.NewConfig()
}
wrapped := &asyncProducer{
AsyncProducer: p,
input: make(chan *sarama.ProducerMessage),
successes: make(chan *sarama.ProducerMessage),
errors: make(chan *sarama.ProducerError),
closeErr: make(chan error),
}
go func() {
producerMessageContexts := make(map[interface{}]producerMessageContext)
// Clear all spans.
// Sarama will consume all the successes and errors by itself while closing,
// so our `Successes()` and `Errors()` may get nothing and those remaining spans
// cannot be closed.
defer func() {
for _, mc := range producerMessageContexts {
finishProducerSpan(mc.span, 0, 0, nil)
}
}()
defer close(wrapped.successes)
defer close(wrapped.errors)
for {
select {
case msg := <-wrapped.input:
// Shut down if message metadata is a close type.
// Sarama will close after dispatching every message.
// So wrapper should follow this mechanism by adding a special message at
// the end of the input channel.
if ct, ok := msg.Metadata.(closeType); ok {
switch ct {
case closeSync:
go func() {
wrapped.closeErr <- p.Close()
}()
case closeAsync:
p.AsyncClose()
}
continue
}
span := startProducerSpan(cfg, saramaConfig.Version, msg)
// Create message context, backend message metadata
mc := producerMessageContext{
metadataBackup: msg.Metadata,
span: span,
}
// Specific metadata with span id
msg.Metadata = span.SpanContext().SpanID
p.Input() <- msg
if saramaConfig.Producer.Return.Successes {
producerMessageContexts[msg.Metadata] = mc
} else {
// If returning successes isn't enabled, we just finish the
// span right away because there's no way to know when it will
// be done.
finishProducerSpan(span, msg.Partition, msg.Offset, nil)
}
case msg, ok := <-p.Successes():
if !ok {
// producer was closed, so exit
return
}
key := msg.Metadata
if mc, ok := producerMessageContexts[key]; ok {
delete(producerMessageContexts, key)
finishProducerSpan(mc.span, msg.Partition, msg.Offset, nil)
// Restore message metadata
msg.Metadata = mc.metadataBackup
}
wrapped.successes <- msg
case err, ok := <-p.Errors():
if !ok {
// producer was closed
return
}
key := err.Msg.Metadata
if mc, ok := producerMessageContexts[key]; ok {
delete(producerMessageContexts, key)
finishProducerSpan(mc.span, err.Msg.Partition, err.Msg.Offset, err.Err)
}
wrapped.errors <- err
}
}
}()
return wrapped
}
func startProducerSpan(cfg config, version sarama.KafkaVersion, msg *sarama.ProducerMessage) trace.Span {
// If there's a span context in the message, use that as the parent context.
carrier := NewProducerMessageCarrier(msg)
ctx := propagation.ExtractHTTP(context.Background(), cfg.Propagators, carrier)
// Create a span.
attrs := []kv.KeyValue{
standard.ServiceNameKey.String(cfg.ServiceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(msg.Topic),
}
opts := []trace.StartOption{
trace.WithAttributes(attrs...),
trace.WithSpanKind(trace.SpanKindProducer),
}
ctx, span := cfg.Tracer.Start(ctx, "kafka.produce", opts...)
if version.IsAtLeast(sarama.V0_11_0_0) {
// Inject current span context, so consumers can use it to propagate span.
propagation.InjectHTTP(ctx, cfg.Propagators, carrier)
}
return span
}
func finishProducerSpan(span trace.Span, partition int32, offset int64, err error) {
span.SetAttributes(
standard.MessagingMessageIDKey.String(strconv.FormatInt(offset, 10)),
kafkaPartitionKey.Int32(partition),
)
if err != nil {
span.SetStatus(codes.Internal, err.Error())
}
span.End()
}

View File

@ -0,0 +1,419 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package sarama
import (
"context"
"errors"
"testing"
"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/codes"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/propagation"
"go.opentelemetry.io/otel/api/standard"
"go.opentelemetry.io/otel/api/trace"
mocktracer "go.opentelemetry.io/contrib/internal/trace"
)
func TestWrapSyncProducer(t *testing.T) {
var err error
// Mock tracer
mt := mocktracer.NewTracer("kafka")
cfg := newSaramaConfig()
// Mock sync producer
mockSyncProducer := mocks.NewSyncProducer(t, cfg)
// Wrap sync producer
syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt))
// Create message with span context
ctx, _ := mt.Start(context.Background(), "")
messageWithSpanContext := sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo")}
propagation.InjectHTTP(ctx, propagators, NewProducerMessageCarrier(&messageWithSpanContext))
// Expected
expectedList := []struct {
kvList []kv.KeyValue
parentSpanID trace.SpanID
kind trace.SpanKind
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
standard.MessagingMessageIDKey.String("1"),
kafkaPartitionKey.Int32(0),
},
parentSpanID: trace.SpanFromContext(ctx).SpanContext().SpanID,
kind: trace.SpanKindProducer,
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
standard.MessagingMessageIDKey.String("2"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindProducer,
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
// TODO: The mock sync producer of sarama does not handle the offset while sending messages
// https://github.com/Shopify/sarama/pull/1747
//standard.MessagingMessageIDKey.String("3"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindProducer,
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
//standard.MessagingMessageIDKey.String("4"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindProducer,
},
}
for i := 0; i < len(expectedList); i++ {
mockSyncProducer.ExpectSendMessageAndSucceed()
}
// Send message
msgList := []*sarama.ProducerMessage{
&messageWithSpanContext,
{Topic: topic, Key: sarama.StringEncoder("foo2")},
{Topic: topic, Key: sarama.StringEncoder("foo3")},
{Topic: topic, Key: sarama.StringEncoder("foo4")},
}
_, _, err = syncProducer.SendMessage(msgList[0])
require.NoError(t, err)
_, _, err = syncProducer.SendMessage(msgList[1])
require.NoError(t, err)
// Send messages
require.NoError(t, syncProducer.SendMessages(msgList[2:]))
spanList := mt.EndedSpans()
for i, expected := range expectedList {
span := spanList[i]
msg := msgList[i]
// Check span
assert.True(t, span.SpanContext().IsValid())
assert.Equal(t, expected.parentSpanID, span.ParentSpanID)
assert.Equal(t, "kafka.produce", span.Name)
assert.Equal(t, expected.kind, span.Kind)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
// Check tracing propagation
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg)))
assert.True(t, remoteSpanFromMessage.IsValid())
}
}
func TestWrapAsyncProducer(t *testing.T) {
// Create message with span context
createMessages := func(mt *mocktracer.Tracer) []*sarama.ProducerMessage {
ctx, _ := mt.Start(context.Background(), "")
messageWithSpanContext := sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo")}
propagation.InjectHTTP(ctx, propagators, NewProducerMessageCarrier(&messageWithSpanContext))
mt.EndedSpans()
return []*sarama.ProducerMessage{
&messageWithSpanContext,
{Topic: topic, Key: sarama.StringEncoder("foo2")},
}
}
t.Run("without successes config", func(t *testing.T) {
mt := mocktracer.NewTracer("kafka")
cfg := newSaramaConfig()
mockAsyncProducer := mocks.NewAsyncProducer(t, cfg)
ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt))
msgList := createMessages(mt)
// Send message
for _, msg := range msgList {
mockAsyncProducer.ExpectInputAndSucceed()
ap.Input() <- msg
}
err := ap.Close()
require.NoError(t, err)
spanList := mt.EndedSpans()
// Expected
expectedList := []struct {
kvList []kv.KeyValue
parentSpanID trace.SpanID
kind trace.SpanKind
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
standard.MessagingMessageIDKey.String("0"),
kafkaPartitionKey.Int32(0),
},
parentSpanID: trace.SpanID{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
kind: trace.SpanKindProducer,
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
standard.MessagingMessageIDKey.String("0"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindProducer,
},
}
for i, expected := range expectedList {
span := spanList[i]
msg := msgList[i]
// Check span
assert.True(t, span.SpanContext().IsValid())
assert.Equal(t, expected.parentSpanID, span.ParentSpanID)
assert.Equal(t, "kafka.produce", span.Name)
assert.Equal(t, expected.kind, span.Kind)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
// Check tracing propagation
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg)))
assert.True(t, remoteSpanFromMessage.IsValid())
}
})
t.Run("with successes config", func(t *testing.T) {
mt := mocktracer.NewTracer("kafka")
// Set producer with successes config
cfg := newSaramaConfig()
cfg.Producer.Return.Successes = true
mockAsyncProducer := mocks.NewAsyncProducer(t, cfg)
ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt))
msgList := createMessages(mt)
// Send message
for i, msg := range msgList {
mockAsyncProducer.ExpectInputAndSucceed()
// Add metadata to msg
msg.Metadata = i
ap.Input() <- msg
newMsg := <-ap.Successes()
assert.Equal(t, newMsg, msg)
}
err := ap.Close()
require.NoError(t, err)
spanList := mt.EndedSpans()
// Expected
expectedList := []struct {
kvList []kv.KeyValue
parentSpanID trace.SpanID
kind trace.SpanKind
}{
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
standard.MessagingMessageIDKey.String("1"),
kafkaPartitionKey.Int32(0),
},
parentSpanID: trace.SpanID{0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0, 0x1},
kind: trace.SpanKindProducer,
},
{
kvList: []kv.KeyValue{
standard.ServiceNameKey.String(serviceName),
standard.MessagingSystemKey.String("kafka"),
standard.MessagingDestinationKindKeyTopic,
standard.MessagingDestinationKey.String(topic),
standard.MessagingMessageIDKey.String("2"),
kafkaPartitionKey.Int32(0),
},
kind: trace.SpanKindProducer,
},
}
for i, expected := range expectedList {
span := spanList[i]
msg := msgList[i]
// Check span
assert.True(t, span.SpanContext().IsValid())
assert.Equal(t, expected.parentSpanID, span.ParentSpanID)
assert.Equal(t, "kafka.produce", span.Name)
assert.Equal(t, expected.kind, span.Kind)
for _, k := range expected.kvList {
assert.Equal(t, k.Value, span.Attributes[k.Key], k.Key)
}
// Check metadata
assert.Equal(t, i, msg.Metadata)
// Check tracing propagation
remoteSpanFromMessage := trace.RemoteSpanContextFromContext(propagation.ExtractHTTP(context.Background(), propagators, NewProducerMessageCarrier(msg)))
assert.True(t, remoteSpanFromMessage.IsValid())
}
})
}
func TestWrapAsyncProducerError(t *testing.T) {
mt := mocktracer.NewTracer("kafka")
// Set producer with successes config
cfg := newSaramaConfig()
cfg.Producer.Return.Successes = true
mockAsyncProducer := mocks.NewAsyncProducer(t, cfg)
ap := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt))
mockAsyncProducer.ExpectInputAndFail(errors.New("test"))
ap.Input() <- &sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder("foo2")}
err := <-ap.Errors()
require.Error(t, err)
ap.AsyncClose()
spanList := mt.EndedSpans()
assert.Len(t, spanList, 1)
span := spanList[0]
assert.Equal(t, codes.Internal, span.Status)
assert.Equal(t, "test", span.StatusMessage)
}
func newSaramaConfig() *sarama.Config {
cfg := sarama.NewConfig()
cfg.Version = sarama.V0_11_0_0
return cfg
}
func BenchmarkWrapSyncProducer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
cfg := newSaramaConfig()
// Mock sync producer
mockSyncProducer := mocks.NewSyncProducer(b, cfg)
// Wrap sync producer
syncProducer := WrapSyncProducer(serviceName, cfg, mockSyncProducer, WithTracer(mt))
message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mockSyncProducer.ExpectSendMessageAndSucceed()
_, _, err := syncProducer.SendMessage(&message)
assert.NoError(b, err)
}
}
func BenchmarkMockSyncProducer(b *testing.B) {
cfg := newSaramaConfig()
// Mock sync producer
mockSyncProducer := mocks.NewSyncProducer(b, cfg)
// Wrap sync producer
syncProducer := mockSyncProducer
message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mockSyncProducer.ExpectSendMessageAndSucceed()
_, _, err := syncProducer.SendMessage(&message)
assert.NoError(b, err)
}
}
func BenchmarkWrapAsyncProducer(b *testing.B) {
// Mock tracer
mt := mocktracer.NewTracer("kafka")
cfg := newSaramaConfig()
cfg.Producer.Return.Successes = true
mockAsyncProducer := mocks.NewAsyncProducer(b, cfg)
// Wrap sync producer
asyncProducer := WrapAsyncProducer(serviceName, cfg, mockAsyncProducer, WithTracer(mt))
message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mockAsyncProducer.ExpectInputAndSucceed()
asyncProducer.Input() <- &message
<-asyncProducer.Successes()
}
}
func BenchmarkMockAsyncProducer(b *testing.B) {
cfg := newSaramaConfig()
cfg.Producer.Return.Successes = true
mockAsyncProducer := mocks.NewAsyncProducer(b, cfg)
// Wrap sync producer
asyncProducer := mockAsyncProducer
message := sarama.ProducerMessage{Key: sarama.StringEncoder("foo")}
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
mockAsyncProducer.ExpectInputAndSucceed()
mockAsyncProducer.Input() <- &message
<-asyncProducer.Successes()
}
}

View File

@ -28,13 +28,15 @@ import (
// Span is a mock span used in association with Tracer for
// testing purpose only.
type Span struct {
sc oteltrace.SpanContext
tracer *Tracer
Name string
Attributes map[otelkv.Key]otelvalue.Value
Kind oteltrace.SpanKind
Status codes.Code
ParentSpanID oteltrace.SpanID
sc oteltrace.SpanContext
tracer *Tracer
Name string
Attributes map[otelkv.Key]otelvalue.Value
Kind oteltrace.SpanKind
Status codes.Code
StatusMessage string
ParentSpanID oteltrace.SpanID
Links map[oteltrace.SpanContext][]otelkv.KeyValue
}
var _ oteltrace.Span = (*Span)(nil)
@ -57,6 +59,7 @@ func (ms *Span) IsRecording() bool {
// SetStatus sets the Status member.
func (ms *Span) SetStatus(status codes.Code, msg string) {
ms.Status = status
ms.StatusMessage = msg
}
// SetAttribute adds a single inferred attribute.

View File

@ -21,7 +21,10 @@ import (
"sync"
"sync/atomic"
"go.opentelemetry.io/otel/api/kv"
oteltrace "go.opentelemetry.io/otel/api/trace"
"go.opentelemetry.io/contrib/internal/trace/parent"
)
type Provider struct {
@ -116,7 +119,7 @@ func (mt *Tracer) Start(ctx context.Context, name string, o ...oteltrace.StartOp
var span *Span
var sc oteltrace.SpanContext
parentSpanContext := getSpanContext(ctx, opts.NewRoot)
parentSpanContext, _, links := parent.GetSpanContextAndLinks(ctx, opts.NewRoot)
parentSpanID := parentSpanContext.SpanID
if !parentSpanContext.IsValid() {
@ -136,6 +139,7 @@ func (mt *Tracer) Start(ctx context.Context, name string, o ...oteltrace.StartOp
Name: name,
Attributes: nil,
ParentSpanID: parentSpanID,
Links: make(map[oteltrace.SpanContext][]kv.KeyValue),
}
if len(opts.Attributes) > 0 {
span.SetAttributes(opts.Attributes...)
@ -145,23 +149,12 @@ func (mt *Tracer) Start(ctx context.Context, name string, o ...oteltrace.StartOp
mt.OnSpanStarted(span)
}
for _, link := range links {
span.Links[link.SpanContext] = link.Attributes
}
for _, link := range opts.Links {
span.Links[link.SpanContext] = link.Attributes
}
return oteltrace.ContextWithSpan(ctx, span), span
}
func getSpanContext(ctx context.Context, ignoreContext bool) oteltrace.SpanContext {
if ignoreContext {
return oteltrace.EmptySpanContext()
}
lsctx := oteltrace.SpanFromContext(ctx).SpanContext()
if lsctx.IsValid() {
return lsctx
}
rsctx := oteltrace.RemoteSpanContextFromContext(ctx)
if rsctx.IsValid() {
return rsctx
}
return oteltrace.EmptySpanContext()
}

View File

@ -0,0 +1,53 @@
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package parent
import (
"context"
"go.opentelemetry.io/otel/api/kv"
"go.opentelemetry.io/otel/api/trace"
)
func GetSpanContextAndLinks(ctx context.Context, ignoreContext bool) (trace.SpanContext, bool, []trace.Link) {
lsctx := trace.SpanFromContext(ctx).SpanContext()
rsctx := trace.RemoteSpanContextFromContext(ctx)
if ignoreContext {
links := addLinkIfValid(nil, lsctx, "current")
links = addLinkIfValid(links, rsctx, "remote")
return trace.EmptySpanContext(), false, links
}
if lsctx.IsValid() {
return lsctx, false, nil
}
if rsctx.IsValid() {
return rsctx, true, nil
}
return trace.EmptySpanContext(), false, nil
}
func addLinkIfValid(links []trace.Link, sc trace.SpanContext, kind string) []trace.Link {
if !sc.IsValid() {
return links
}
return append(links, trace.Link{
SpanContext: sc,
Attributes: []kv.KeyValue{
kv.String("ignored-on-demand", kind),
},
})
}