SQS propagation for AWS SDK 1.1 (#2114)

This commit is contained in:
Jakub Wach 2021-01-29 09:21:12 +01:00 committed by GitHub
parent 0a74799419
commit 77922d4834
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 425 additions and 50 deletions

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.api.trace.Span.Kind.CLIENT;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Span.Kind;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Context;
@ -69,14 +70,20 @@ public abstract class HttpClientTracer<REQUEST, CARRIER, RESPONSE> extends BaseT
}
public Context startSpan(
Context parentContext, REQUEST request, CARRIER carrier, long startTimeNanos) {
Kind kind, Context parentContext, REQUEST request, CARRIER carrier, long startTimeNanos) {
Span span =
internalStartSpan(parentContext, request, spanNameForRequest(request), startTimeNanos);
internalStartSpan(
kind, parentContext, request, spanNameForRequest(request), startTimeNanos);
Context context = withClientSpan(parentContext, span);
inject(context, carrier);
return context;
}
public Context startSpan(
Context parentContext, REQUEST request, CARRIER carrier, long startTimeNanos) {
return startSpan(Kind.CLIENT, parentContext, request, carrier, startTimeNanos);
}
protected void inject(Context context, CARRIER carrier) {
Setter<CARRIER> setter = getSetter();
if (setter == null) {
@ -130,8 +137,8 @@ public abstract class HttpClientTracer<REQUEST, CARRIER, RESPONSE> extends BaseT
}
private Span internalStartSpan(
Context parentContext, REQUEST request, String name, long startTimeNanos) {
SpanBuilder spanBuilder = tracer.spanBuilder(name).setSpanKind(CLIENT).setParent(parentContext);
Kind kind, Context parentContext, REQUEST request, String name, long startTimeNanos) {
SpanBuilder spanBuilder = tracer.spanBuilder(name).setSpanKind(kind).setParent(parentContext);
if (startTimeNanos > 0) {
spanBuilder.setStartTimestamp(startTimeNanos, TimeUnit.NANOSECONDS);
}

View File

@ -33,8 +33,11 @@ dependencies {
testImplementation 'javax.xml.bind:jaxb-api:2.3.1'
/**
testImplementation deps.testcontainers
testImplementation "org.testcontainers:localstack:1.15.0-rc2"
testImplementation "org.testcontainers:localstack:1.15.0-rc2"**/
testLibrary group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.12', version: '1.0.0'
latestDepTestLibrary group: 'org.apache.camel', name: 'camel-core', version: '2.+'

View File

@ -54,10 +54,10 @@ class ActiveSpanManager {
* @param exchange The exchange
* @param span The span
*/
public static void activate(Exchange exchange, Span span, CamelDirection direction) {
public static void activate(Exchange exchange, Span span, Span.Kind spanKind) {
SpanWithScope parent = exchange.getProperty(ACTIVE_SPAN_PROPERTY, SpanWithScope.class);
SpanWithScope spanWithScope = SpanWithScope.activate(span, parent, direction);
SpanWithScope spanWithScope = SpanWithScope.activate(span, parent, spanKind);
exchange.setProperty(ACTIVE_SPAN_PROPERTY, spanWithScope);
LOG.debug("Activated a span: {}", spanWithScope);
}
@ -91,10 +91,9 @@ class ActiveSpanManager {
this.scope = scope;
}
public static SpanWithScope activate(
Span span, SpanWithScope parent, CamelDirection direction) {
public static SpanWithScope activate(Span span, SpanWithScope parent, Span.Kind spanKind) {
Scope scope = null;
if (CamelDirection.OUTBOUND.equals(direction)) {
if (isClientSpan(spanKind)) {
scope = CamelTracer.TRACER.startClientScope(span);
} else {
scope = CamelTracer.TRACER.startScope(span);
@ -103,6 +102,10 @@ class ActiveSpanManager {
return new SpanWithScope(parent, span, scope);
}
private static boolean isClientSpan(Span.Kind kind) {
return (Span.Kind.CLIENT.equals(kind) || Span.Kind.PRODUCER.equals(kind));
}
public SpanWithScope getParent() {
return parent;
}

View File

@ -61,7 +61,7 @@ final class CamelEventNotifier extends EventNotifierSupport {
sd.getOperationName(ese.getExchange(), ese.getEndpoint(), CamelDirection.OUTBOUND);
Span span = CamelTracer.TRACER.startSpan(name, sd.getInitiatorSpanKind());
sd.pre(span, ese.getExchange(), ese.getEndpoint(), CamelDirection.OUTBOUND);
ActiveSpanManager.activate(ese.getExchange(), span, CamelDirection.OUTBOUND);
ActiveSpanManager.activate(ese.getExchange(), span, sd.getInitiatorSpanKind());
CamelPropagationUtil.injectParent(Context.current(), ese.getExchange().getIn().getHeaders());
LOG.debug("[Exchange sending] Initiator span started: {}", span);

View File

@ -36,13 +36,13 @@ final class CamelRoutePolicy extends RoutePolicySupport {
private static final Logger LOG = LoggerFactory.getLogger(CamelRoutePolicy.class);
private Span spanOnExchangeBegin(Route route, Exchange exchange, SpanDecorator sd) {
private Span spanOnExchangeBegin(
Route route, Exchange exchange, SpanDecorator sd, Span.Kind spanKind) {
Span activeSpan = CamelTracer.TRACER.getCurrentSpan();
String name = sd.getOperationName(exchange, route.getEndpoint(), CamelDirection.INBOUND);
SpanBuilder builder = CamelTracer.TRACER.spanBuilder(name);
builder.setSpanKind(spanKind);
if (!activeSpan.getSpanContext().isValid()) {
// root operation, set kind, otherwise - INTERNAL
builder.setSpanKind(sd.getReceiverSpanKind());
Context parentContext = CamelPropagationUtil.extractParent(exchange.getIn().getHeaders());
if (parentContext != null) {
builder.setParent(parentContext);
@ -51,6 +51,12 @@ final class CamelRoutePolicy extends RoutePolicySupport {
return builder.startSpan();
}
private Span.Kind spanKind(SpanDecorator sd) {
Span activeSpan = CamelTracer.TRACER.getCurrentSpan();
// if there's an active span, this is not a root span which we always mark as INTERNAL
return (activeSpan.getSpanContext().isValid() ? Span.Kind.INTERNAL : sd.getReceiverSpanKind());
}
/**
* Route exchange started, ie request could have been already captured by upper layer
* instrumentation.
@ -59,9 +65,10 @@ final class CamelRoutePolicy extends RoutePolicySupport {
public void onExchangeBegin(Route route, Exchange exchange) {
try {
SpanDecorator sd = CamelTracer.TRACER.getSpanDecorator(route.getEndpoint());
Span span = spanOnExchangeBegin(route, exchange, sd);
Span.Kind spanKind = spanKind(sd);
Span span = spanOnExchangeBegin(route, exchange, sd, spanKind);
sd.pre(span, exchange, route.getEndpoint(), CamelDirection.INBOUND);
ActiveSpanManager.activate(exchange, span, CamelDirection.INBOUND);
ActiveSpanManager.activate(exchange, span, spanKind);
LOG.debug("[Route start] Receiver span started {}", span);
} catch (Throwable t) {
LOG.warn("Failed to capture tracing data", t);

View File

@ -94,12 +94,22 @@ class MessagingSpanDecorator extends BaseSpanDecorator {
@Override
public Span.Kind getInitiatorSpanKind() {
return Kind.PRODUCER;
switch (component) {
case "aws-sqs":
return Kind.INTERNAL;
default:
return Kind.PRODUCER;
}
}
@Override
public Span.Kind getReceiverSpanKind() {
return Kind.CONSUMER;
switch (component) {
case "aws-sqs":
return Kind.INTERNAL;
default:
return Kind.CONSUMER;
}
}
/**

View File

@ -11,14 +11,13 @@ import static io.opentelemetry.api.trace.Span.Kind.INTERNAL
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
import io.opentelemetry.instrumentation.test.AgentTestRunner
import io.opentelemetry.instrumentation.test.utils.PortUtils
import org.apache.camel.CamelContext
import org.apache.camel.ProducerTemplate
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import org.springframework.boot.SpringApplication
import org.springframework.context.ApplicationContextInitializer
import org.springframework.context.ConfigurableApplicationContext
import org.springframework.context.support.AbstractApplicationContext
import org.testcontainers.containers.localstack.LocalStackContainer
import org.testcontainers.utility.DockerImageName
import org.testcontainers.shaded.com.google.common.collect.ImmutableMap
import spock.lang.Shared
class SqsCamelTest extends AgentTestRunner {
@ -33,6 +32,9 @@ class SqsCamelTest extends AgentTestRunner {
def setupSpec() {
/**
* Temporarily using emq instead of localstack till the latter supports AWS trace propagation
*
sqs = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest"))
.withServices(LocalStackContainer.Service.SQS)
sqs.start()
@ -45,6 +47,14 @@ class SqsCamelTest extends AgentTestRunner {
applicationContext.getBeanFactory().registerSingleton("localStack", sqs)
}
})
server = app.run()**/
sqsPort = PortUtils.randomOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"
def app = new SpringApplication(SqsConfig)
app.setDefaultProperties(ImmutableMap.of("sqs.port", sqsPort))
server = app.run()
}
@ -54,7 +64,7 @@ class SqsCamelTest extends AgentTestRunner {
server = null
}
if (sqs != null) {
sqs.stop()
sqs.stopAndWait()
}
}
@ -67,8 +77,8 @@ class SqsCamelTest extends AgentTestRunner {
template.sendBody("direct:input", "{\"type\": \"hello\"}")
then:
assertTraces(5) {
trace(0, 3) {
assertTraces(6) {
trace(0, 5) {
span(0) {
name "input"
@ -80,7 +90,7 @@ class SqsCamelTest extends AgentTestRunner {
}
span(1) {
name "sqsCamelTest"
kind PRODUCER
kind INTERNAL
childOf span(0)
attributes {
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient"
@ -88,8 +98,47 @@ class SqsCamelTest extends AgentTestRunner {
}
}
span(2) {
name "sqsCamelTest"
name "SQS.SendMessage"
kind PRODUCER
childOf span(1)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "SendMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
span(3) {
name "SQS.ReceiveMessage"
kind CONSUMER
childOf span(2)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"http.user_agent" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
span(4) {
name "sqsCamelTest"
kind INTERNAL
childOf span(1)
attributes {
"apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient&messageAttributeNames=traceparent"
@ -105,15 +154,15 @@ class SqsCamelTest extends AgentTestRunner {
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://127.0.0.1:$sqsPort"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://127.0.0.1:$sqsPort"
"net.peer.name" "127.0.0.1"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
@ -126,15 +175,15 @@ class SqsCamelTest extends AgentTestRunner {
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://127.0.0.1:$sqsPort"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "DeleteMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://127.0.0.1:$sqsPort"
"net.peer.name" "127.0.0.1"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
@ -147,15 +196,15 @@ class SqsCamelTest extends AgentTestRunner {
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://127.0.0.1:$sqsPort"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://127.0.0.1:$sqsPort"
"net.peer.name" "127.0.0.1"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
@ -168,15 +217,36 @@ class SqsCamelTest extends AgentTestRunner {
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://127.0.0.1:$sqsPort"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://127.0.0.1:$sqsPort"
"net.peer.name" "127.0.0.1"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
}
trace(5, 1) {
it.span(0) {
name "SQS.ReceiveMessage"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}

View File

@ -5,15 +5,17 @@
package test
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import org.apache.camel.LoggingLevel
import org.apache.camel.builder.RouteBuilder
import org.springframework.beans.factory.annotation.Value
import org.springframework.boot.SpringBootConfiguration
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.context.annotation.Bean
import org.testcontainers.containers.localstack.LocalStackContainer
@SpringBootConfiguration
@EnableAutoConfiguration
@ -46,11 +48,21 @@ class SqsConfig {
}
}
/**
* Temporarily using emq instead of localstack till the latter supports AWS trace propagation
*
@Bean
AmazonSQSAsync sqsClient(LocalStackContainer localstack) {
return AmazonSQSAsyncClient.asyncBuilder().withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS))
.withCredentials(localstack.getDefaultCredentialsProvider())
.build()
}**/
@Bean
AmazonSQSAsync sqsClient(@Value("\${sqs.port}") int port) {
def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:"+port, "elasticmq")
return AmazonSQSAsyncClient.asyncBuilder().withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build()
}
}

View File

@ -48,6 +48,9 @@ dependencies {
compileOnly deps.opentelemetryTraceProps
library group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.11.0'
implementation deps.opentelemetryTraceProps
compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106'
// Include httpclient instrumentation for testing because it is a dependency for aws-sdk.
testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent')
@ -55,8 +58,8 @@ dependencies {
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-rds', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-ec2', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-dynamodb', version: '1.11.106'
testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106'
// Make sure doesn't add HTTP headers
testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent')
@ -64,6 +67,9 @@ dependencies {
// needed for kinesis:
testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: versions.jackson
// needed for SQS - using emq directly as localstack references emq v0.15.7 ie WITHOUT AWS trace header propagation
testLibrary group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.12', version: '1.0.0'
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-rds', version: '1.11.0')
test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-ec2', version: '1.11.0')

View File

@ -46,8 +46,9 @@ public class AwsSdkClientTracer extends HttpClientTracer<Request<?>, Request<?>,
return qualifiedOperation(awsServiceName, awsOperation);
}
public Context startSpan(Context parentContext, Request<?> request, RequestMeta requestMeta) {
Context context = super.startSpan(parentContext, request, request);
public Context startSpan(
Span.Kind kind, Context parentContext, Request<?> request, RequestMeta requestMeta) {
Context context = super.startSpan(kind, parentContext, request, request, -1);
Span span = Span.fromContext(context);
String awsServiceName = request.getServiceName();

View File

@ -0,0 +1,41 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.extension.trace.propagation.AwsXrayPropagator;
import java.util.Collections;
import java.util.Map;
class SqsParentContext {
private static class MapGetter implements TextMapPropagator.Getter<Map<String, String>> {
private static final MapGetter INSTANCE = new MapGetter();
@Override
public Iterable<String> keys(Map<String, String> map) {
return map.keySet();
}
@Override
public String get(Map<String, String> map, String s) {
return map.get(s);
}
}
static final String AWS_TRACE_SYSTEM_ATTRIBUTE = "AWSTraceHeader";
static Context ofSystemAttributes(Map<String, String> systemAttributes) {
String traceHeader = systemAttributes.get(AWS_TRACE_SYSTEM_ATTRIBUTE);
return AwsXrayPropagator.getInstance()
.extract(
Context.current(),
Collections.singletonMap("X-Amzn-Trace-Id", traceHeader),
MapGetter.INSTANCE);
}
}

View File

@ -12,9 +12,15 @@ import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.handlers.RequestHandler2;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.api.ContextStore;
import java.util.List;
/** Tracing Request Handler. */
public class TracingRequestHandler extends RequestHandler2 {
@ -27,19 +33,40 @@ public class TracingRequestHandler extends RequestHandler2 {
@Override
public void beforeRequest(Request<?> request) {
AmazonWebServiceRequest originalRequest = request.getOriginalRequest();
Span.Kind kind = (isSqsProducer(originalRequest) ? Span.Kind.PRODUCER : Span.Kind.CLIENT);
RequestMeta requestMeta = contextStore.get(originalRequest);
Context parentContext = Context.current();
if (!tracer().shouldStartSpan(parentContext)) {
return;
}
Context context = tracer().startSpan(parentContext, request, requestMeta);
Context context = tracer().startSpan(kind, parentContext, request, requestMeta);
Scope scope = context.makeCurrent();
request.addHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY, new ContextScopePair(context, scope));
}
private boolean isSqsProducer(AmazonWebServiceRequest request) {
return (request instanceof SendMessageRequest);
}
@Override
public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) {
if (isSqsConsumer(request)) {
ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) request;
receiveMessageRequest.withAttributeNames(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE);
}
return request;
}
@Override
public void afterResponse(Request<?> request, Response<?> response) {
if (isSqsConsumer(request.getOriginalRequest())) {
afterConsumerResponse(
(Request<ReceiveMessageRequest>) request, (Response<ReceiveMessageResult>) response);
}
// close outstanding "client" span
ContextScopePair scope = request.getHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY);
if (scope == null) {
return;
@ -49,6 +76,28 @@ public class TracingRequestHandler extends RequestHandler2 {
tracer().end(scope.getContext(), response);
}
private boolean isSqsConsumer(AmazonWebServiceRequest request) {
return (request instanceof ReceiveMessageRequest);
}
/** Create and close CONSUMER span for each message consumed. */
private void afterConsumerResponse(
Request<ReceiveMessageRequest> request, Response<ReceiveMessageResult> response) {
ReceiveMessageResult receiveMessageResult = response.getAwsResponse();
List<Message> messages = receiveMessageResult.getMessages();
for (Message message : messages) {
createConsumerSpan(message, request, response);
}
}
private void createConsumerSpan(Message message, Request<?> request, Response<?> response) {
Context parentContext = SqsParentContext.ofSystemAttributes(message.getAttributes());
AmazonWebServiceRequest originalRequest = request.getOriginalRequest();
RequestMeta requestMeta = contextStore.get(originalRequest);
Context context = tracer().startSpan(Span.Kind.CONSUMER, parentContext, request, requestMeta);
tracer().end(context, response);
}
@Override
public void afterError(Request<?> request, Response<?> response, Exception e) {
ContextScopePair scope = request.getHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY);

View File

@ -4,6 +4,7 @@
*/
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
import static io.opentelemetry.instrumentation.test.server.http.TestHttpServer.httpServer
import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT
@ -137,7 +138,7 @@ class Aws1ClientTest extends AgentTestRunner {
trace(0, 1) {
span(0) {
name "$service.$operation"
kind CLIENT
kind operation == "SendMessage" ? PRODUCER : CLIENT
errored false
hasNoParent()
attributes {

View File

@ -0,0 +1,147 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import static io.opentelemetry.api.trace.Span.Kind.CLIENT
import static io.opentelemetry.api.trace.Span.Kind.CONSUMER
import static io.opentelemetry.api.trace.Span.Kind.PRODUCER
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.test.AgentTestRunner
import io.opentelemetry.instrumentation.test.utils.PortUtils
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import spock.lang.Shared
class SqsTracingTest extends AgentTestRunner {
@Shared
def sqs
@Shared
AmazonSQSAsyncClient client
@Shared
int sqsPort
def setupSpec() {
sqsPort = PortUtils.randomOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"
def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:"+sqsPort, "elasticmq")
client = AmazonSQSAsyncClient.asyncBuilder().withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build()
}
def cleanupSpec() {
if (sqs != null) {
sqs.stopAndWait()
}
}
def "simple sqs producer-consumer services"() {
setup:
client.createQueue("testSdkSqs")
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send)
client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
then:
assertTraces(3) {
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "CreateQueueRequest"
"aws.queue.name" "testSdkSqs"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
}
trace(1, 2) {
span(0) {
name "SQS.SendMessage"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "SendMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
span(1) {
name "SQS.ReceiveMessage"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"http.user_agent" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
}
/**
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
*/
trace(2, 1) {
span(0) {
name "SQS.ReceiveMessage"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.operation" "ReceiveMessageRequest"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"aws.service" "AmazonSQS"
"http.flavor" "1.1"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"net.transport" "IP.TCP"
}
}
}
}
}
}

View File

@ -0,0 +1,18 @@
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<Pattern>
%d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n
</Pattern>
</layout>
</appender>
<root level="DEBUG">
<appender-ref ref="console"/>
</root>
<logger name="org.elasticmq" level="debug" />
</configuration>