diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/tracer/HttpClientTracer.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/tracer/HttpClientTracer.java index 78786971c2..5d34b6514d 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/tracer/HttpClientTracer.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/tracer/HttpClientTracer.java @@ -9,6 +9,7 @@ import static io.opentelemetry.api.trace.Span.Kind.CLIENT; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.Span.Kind; import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.api.trace.Tracer; import io.opentelemetry.context.Context; @@ -69,14 +70,20 @@ public abstract class HttpClientTracer extends BaseT } public Context startSpan( - Context parentContext, REQUEST request, CARRIER carrier, long startTimeNanos) { + Kind kind, Context parentContext, REQUEST request, CARRIER carrier, long startTimeNanos) { Span span = - internalStartSpan(parentContext, request, spanNameForRequest(request), startTimeNanos); + internalStartSpan( + kind, parentContext, request, spanNameForRequest(request), startTimeNanos); Context context = withClientSpan(parentContext, span); inject(context, carrier); return context; } + public Context startSpan( + Context parentContext, REQUEST request, CARRIER carrier, long startTimeNanos) { + return startSpan(Kind.CLIENT, parentContext, request, carrier, startTimeNanos); + } + protected void inject(Context context, CARRIER carrier) { Setter setter = getSetter(); if (setter == null) { @@ -130,8 +137,8 @@ public abstract class HttpClientTracer extends BaseT } private Span internalStartSpan( - Context parentContext, REQUEST request, String name, long startTimeNanos) { - SpanBuilder spanBuilder = tracer.spanBuilder(name).setSpanKind(CLIENT).setParent(parentContext); + Kind kind, Context parentContext, REQUEST request, String name, long startTimeNanos) { + SpanBuilder spanBuilder = tracer.spanBuilder(name).setSpanKind(kind).setParent(parentContext); if (startTimeNanos > 0) { spanBuilder.setStartTimestamp(startTimeNanos, TimeUnit.NANOSECONDS); } diff --git a/instrumentation/apache-camel-2.20/javaagent/apache-camel-2.20-javaagent.gradle b/instrumentation/apache-camel-2.20/javaagent/apache-camel-2.20-javaagent.gradle index 5a91caff1f..9ece31aa87 100644 --- a/instrumentation/apache-camel-2.20/javaagent/apache-camel-2.20-javaagent.gradle +++ b/instrumentation/apache-camel-2.20/javaagent/apache-camel-2.20-javaagent.gradle @@ -33,8 +33,11 @@ dependencies { testImplementation 'javax.xml.bind:jaxb-api:2.3.1' + /** testImplementation deps.testcontainers - testImplementation "org.testcontainers:localstack:1.15.0-rc2" + testImplementation "org.testcontainers:localstack:1.15.0-rc2"**/ + + testLibrary group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.12', version: '1.0.0' latestDepTestLibrary group: 'org.apache.camel', name: 'camel-core', version: '2.+' diff --git a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/ActiveSpanManager.java b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/ActiveSpanManager.java index fe762f1570..216fc9807d 100644 --- a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/ActiveSpanManager.java +++ b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/ActiveSpanManager.java @@ -54,10 +54,10 @@ class ActiveSpanManager { * @param exchange The exchange * @param span The span */ - public static void activate(Exchange exchange, Span span, CamelDirection direction) { + public static void activate(Exchange exchange, Span span, Span.Kind spanKind) { SpanWithScope parent = exchange.getProperty(ACTIVE_SPAN_PROPERTY, SpanWithScope.class); - SpanWithScope spanWithScope = SpanWithScope.activate(span, parent, direction); + SpanWithScope spanWithScope = SpanWithScope.activate(span, parent, spanKind); exchange.setProperty(ACTIVE_SPAN_PROPERTY, spanWithScope); LOG.debug("Activated a span: {}", spanWithScope); } @@ -91,10 +91,9 @@ class ActiveSpanManager { this.scope = scope; } - public static SpanWithScope activate( - Span span, SpanWithScope parent, CamelDirection direction) { + public static SpanWithScope activate(Span span, SpanWithScope parent, Span.Kind spanKind) { Scope scope = null; - if (CamelDirection.OUTBOUND.equals(direction)) { + if (isClientSpan(spanKind)) { scope = CamelTracer.TRACER.startClientScope(span); } else { scope = CamelTracer.TRACER.startScope(span); @@ -103,6 +102,10 @@ class ActiveSpanManager { return new SpanWithScope(parent, span, scope); } + private static boolean isClientSpan(Span.Kind kind) { + return (Span.Kind.CLIENT.equals(kind) || Span.Kind.PRODUCER.equals(kind)); + } + public SpanWithScope getParent() { return parent; } diff --git a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelEventNotifier.java b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelEventNotifier.java index 0bf1de7096..7413db5b09 100644 --- a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelEventNotifier.java +++ b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelEventNotifier.java @@ -61,7 +61,7 @@ final class CamelEventNotifier extends EventNotifierSupport { sd.getOperationName(ese.getExchange(), ese.getEndpoint(), CamelDirection.OUTBOUND); Span span = CamelTracer.TRACER.startSpan(name, sd.getInitiatorSpanKind()); sd.pre(span, ese.getExchange(), ese.getEndpoint(), CamelDirection.OUTBOUND); - ActiveSpanManager.activate(ese.getExchange(), span, CamelDirection.OUTBOUND); + ActiveSpanManager.activate(ese.getExchange(), span, sd.getInitiatorSpanKind()); CamelPropagationUtil.injectParent(Context.current(), ese.getExchange().getIn().getHeaders()); LOG.debug("[Exchange sending] Initiator span started: {}", span); diff --git a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelRoutePolicy.java b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelRoutePolicy.java index a5f8cdddf1..ffe50cb905 100644 --- a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelRoutePolicy.java +++ b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/CamelRoutePolicy.java @@ -36,13 +36,13 @@ final class CamelRoutePolicy extends RoutePolicySupport { private static final Logger LOG = LoggerFactory.getLogger(CamelRoutePolicy.class); - private Span spanOnExchangeBegin(Route route, Exchange exchange, SpanDecorator sd) { + private Span spanOnExchangeBegin( + Route route, Exchange exchange, SpanDecorator sd, Span.Kind spanKind) { Span activeSpan = CamelTracer.TRACER.getCurrentSpan(); String name = sd.getOperationName(exchange, route.getEndpoint(), CamelDirection.INBOUND); SpanBuilder builder = CamelTracer.TRACER.spanBuilder(name); + builder.setSpanKind(spanKind); if (!activeSpan.getSpanContext().isValid()) { - // root operation, set kind, otherwise - INTERNAL - builder.setSpanKind(sd.getReceiverSpanKind()); Context parentContext = CamelPropagationUtil.extractParent(exchange.getIn().getHeaders()); if (parentContext != null) { builder.setParent(parentContext); @@ -51,6 +51,12 @@ final class CamelRoutePolicy extends RoutePolicySupport { return builder.startSpan(); } + private Span.Kind spanKind(SpanDecorator sd) { + Span activeSpan = CamelTracer.TRACER.getCurrentSpan(); + // if there's an active span, this is not a root span which we always mark as INTERNAL + return (activeSpan.getSpanContext().isValid() ? Span.Kind.INTERNAL : sd.getReceiverSpanKind()); + } + /** * Route exchange started, ie request could have been already captured by upper layer * instrumentation. @@ -59,9 +65,10 @@ final class CamelRoutePolicy extends RoutePolicySupport { public void onExchangeBegin(Route route, Exchange exchange) { try { SpanDecorator sd = CamelTracer.TRACER.getSpanDecorator(route.getEndpoint()); - Span span = spanOnExchangeBegin(route, exchange, sd); + Span.Kind spanKind = spanKind(sd); + Span span = spanOnExchangeBegin(route, exchange, sd, spanKind); sd.pre(span, exchange, route.getEndpoint(), CamelDirection.INBOUND); - ActiveSpanManager.activate(exchange, span, CamelDirection.INBOUND); + ActiveSpanManager.activate(exchange, span, spanKind); LOG.debug("[Route start] Receiver span started {}", span); } catch (Throwable t) { LOG.warn("Failed to capture tracing data", t); diff --git a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/MessagingSpanDecorator.java b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/MessagingSpanDecorator.java index 45da1e5404..fff8e8cdcd 100644 --- a/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/MessagingSpanDecorator.java +++ b/instrumentation/apache-camel-2.20/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/apachecamel/decorators/MessagingSpanDecorator.java @@ -94,12 +94,22 @@ class MessagingSpanDecorator extends BaseSpanDecorator { @Override public Span.Kind getInitiatorSpanKind() { - return Kind.PRODUCER; + switch (component) { + case "aws-sqs": + return Kind.INTERNAL; + default: + return Kind.PRODUCER; + } } @Override public Span.Kind getReceiverSpanKind() { - return Kind.CONSUMER; + switch (component) { + case "aws-sqs": + return Kind.INTERNAL; + default: + return Kind.CONSUMER; + } } /** diff --git a/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsCamelTest.groovy b/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsCamelTest.groovy index ec4fdd2aad..c67ae471a3 100644 --- a/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsCamelTest.groovy +++ b/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsCamelTest.groovy @@ -11,14 +11,13 @@ import static io.opentelemetry.api.trace.Span.Kind.INTERNAL import static io.opentelemetry.api.trace.Span.Kind.PRODUCER import io.opentelemetry.instrumentation.test.AgentTestRunner +import io.opentelemetry.instrumentation.test.utils.PortUtils import org.apache.camel.CamelContext import org.apache.camel.ProducerTemplate +import org.elasticmq.rest.sqs.SQSRestServerBuilder import org.springframework.boot.SpringApplication -import org.springframework.context.ApplicationContextInitializer import org.springframework.context.ConfigurableApplicationContext -import org.springframework.context.support.AbstractApplicationContext -import org.testcontainers.containers.localstack.LocalStackContainer -import org.testcontainers.utility.DockerImageName +import org.testcontainers.shaded.com.google.common.collect.ImmutableMap import spock.lang.Shared class SqsCamelTest extends AgentTestRunner { @@ -33,6 +32,9 @@ class SqsCamelTest extends AgentTestRunner { def setupSpec() { + /** + * Temporarily using emq instead of localstack till the latter supports AWS trace propagation + * sqs = new LocalStackContainer(DockerImageName.parse("localstack/localstack:latest")) .withServices(LocalStackContainer.Service.SQS) sqs.start() @@ -45,6 +47,14 @@ class SqsCamelTest extends AgentTestRunner { applicationContext.getBeanFactory().registerSingleton("localStack", sqs) } }) + server = app.run()**/ + + sqsPort = PortUtils.randomOpenPort() + sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start() + println getClass().name + " SQS server started at: localhost:$sqsPort/" + + def app = new SpringApplication(SqsConfig) + app.setDefaultProperties(ImmutableMap.of("sqs.port", sqsPort)) server = app.run() } @@ -54,7 +64,7 @@ class SqsCamelTest extends AgentTestRunner { server = null } if (sqs != null) { - sqs.stop() + sqs.stopAndWait() } } @@ -67,8 +77,8 @@ class SqsCamelTest extends AgentTestRunner { template.sendBody("direct:input", "{\"type\": \"hello\"}") then: - assertTraces(5) { - trace(0, 3) { + assertTraces(6) { + trace(0, 5) { span(0) { name "input" @@ -80,7 +90,7 @@ class SqsCamelTest extends AgentTestRunner { } span(1) { name "sqsCamelTest" - kind PRODUCER + kind INTERNAL childOf span(0) attributes { "apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient" @@ -88,8 +98,47 @@ class SqsCamelTest extends AgentTestRunner { } } span(2) { - name "sqsCamelTest" + name "SQS.SendMessage" + kind PRODUCER + childOf span(1) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "SendMessageRequest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + span(3) { + name "SQS.ReceiveMessage" kind CONSUMER + childOf span(2) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "ReceiveMessageRequest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "http.user_agent" String + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + span(4) { + name "sqsCamelTest" + kind INTERNAL childOf span(1) attributes { "apache-camel.uri" "aws-sqs://sqsCamelTest?amazonSQSClient=%23sqsClient&messageAttributeNames=traceparent" @@ -105,15 +154,15 @@ class SqsCamelTest extends AgentTestRunner { hasNoParent() attributes { "aws.agent" "java-aws-sdk" - "aws.endpoint" "http://127.0.0.1:$sqsPort" + "aws.endpoint" "http://localhost:$sqsPort" "aws.operation" "ReceiveMessageRequest" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" "aws.service" "AmazonSQS" "http.flavor" "1.1" "http.method" "POST" "http.status_code" 200 - "http.url" "http://127.0.0.1:$sqsPort" - "net.peer.name" "127.0.0.1" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" "net.peer.port" sqsPort "net.transport" "IP.TCP" } @@ -126,15 +175,15 @@ class SqsCamelTest extends AgentTestRunner { hasNoParent() attributes { "aws.agent" "java-aws-sdk" - "aws.endpoint" "http://127.0.0.1:$sqsPort" + "aws.endpoint" "http://localhost:$sqsPort" "aws.operation" "DeleteMessageRequest" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" "aws.service" "AmazonSQS" "http.flavor" "1.1" "http.method" "POST" "http.status_code" 200 - "http.url" "http://127.0.0.1:$sqsPort" - "net.peer.name" "127.0.0.1" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" "net.peer.port" sqsPort "net.transport" "IP.TCP" } @@ -147,15 +196,15 @@ class SqsCamelTest extends AgentTestRunner { hasNoParent() attributes { "aws.agent" "java-aws-sdk" - "aws.endpoint" "http://127.0.0.1:$sqsPort" + "aws.endpoint" "http://localhost:$sqsPort" "aws.operation" "ReceiveMessageRequest" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" "aws.service" "AmazonSQS" "http.flavor" "1.1" "http.method" "POST" "http.status_code" 200 - "http.url" "http://127.0.0.1:$sqsPort" - "net.peer.name" "127.0.0.1" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" "net.peer.port" sqsPort "net.transport" "IP.TCP" } @@ -168,15 +217,36 @@ class SqsCamelTest extends AgentTestRunner { hasNoParent() attributes { "aws.agent" "java-aws-sdk" - "aws.endpoint" "http://127.0.0.1:$sqsPort" + "aws.endpoint" "http://localhost:$sqsPort" "aws.operation" "ReceiveMessageRequest" - "aws.queue.url" "http://localhost:$sqsPort/000000000000/sqsCamelTest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" "aws.service" "AmazonSQS" "http.flavor" "1.1" "http.method" "POST" "http.status_code" 200 - "http.url" "http://127.0.0.1:$sqsPort" - "net.peer.name" "127.0.0.1" + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + } + trace(5, 1) { + it.span(0) { + name "SQS.ReceiveMessage" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "ReceiveMessageRequest" + "aws.queue.url" "http://localhost:$sqsPort/queue/sqsCamelTest" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" "net.peer.port" sqsPort "net.transport" "IP.TCP" } diff --git a/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsConfig.groovy b/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsConfig.groovy index b2935a0570..6d4140bb97 100644 --- a/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsConfig.groovy +++ b/instrumentation/apache-camel-2.20/javaagent/src/test/groovy/test/SqsConfig.groovy @@ -5,15 +5,17 @@ package test - +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.sqs.AmazonSQSAsync import com.amazonaws.services.sqs.AmazonSQSAsyncClient import org.apache.camel.LoggingLevel import org.apache.camel.builder.RouteBuilder +import org.springframework.beans.factory.annotation.Value import org.springframework.boot.SpringBootConfiguration import org.springframework.boot.autoconfigure.EnableAutoConfiguration import org.springframework.context.annotation.Bean -import org.testcontainers.containers.localstack.LocalStackContainer @SpringBootConfiguration @EnableAutoConfiguration @@ -46,11 +48,21 @@ class SqsConfig { } } + /** + * Temporarily using emq instead of localstack till the latter supports AWS trace propagation + * @Bean AmazonSQSAsync sqsClient(LocalStackContainer localstack) { return AmazonSQSAsyncClient.asyncBuilder().withEndpointConfiguration(localstack.getEndpointConfiguration(LocalStackContainer.Service.SQS)) .withCredentials(localstack.getDefaultCredentialsProvider()) .build() + }**/ + + @Bean + AmazonSQSAsync sqsClient(@Value("\${sqs.port}") int port) { + def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")) + def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:"+port, "elasticmq") + return AmazonSQSAsyncClient.asyncBuilder().withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build() } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/aws-sdk-1.11-javaagent.gradle b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/aws-sdk-1.11-javaagent.gradle index c18993cfdc..ebd75821e2 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/aws-sdk-1.11-javaagent.gradle +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/aws-sdk-1.11-javaagent.gradle @@ -48,6 +48,9 @@ dependencies { compileOnly deps.opentelemetryTraceProps library group: 'com.amazonaws', name: 'aws-java-sdk-core', version: '1.11.0' + implementation deps.opentelemetryTraceProps + + compileOnly group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106' // Include httpclient instrumentation for testing because it is a dependency for aws-sdk. testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent') @@ -55,8 +58,8 @@ dependencies { testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-rds', version: '1.11.106' testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-ec2', version: '1.11.106' testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-kinesis', version: '1.11.106' - testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106' testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-dynamodb', version: '1.11.106' + testLibrary group: 'com.amazonaws', name: 'aws-java-sdk-sqs', version: '1.11.106' // Make sure doesn't add HTTP headers testInstrumentation project(':instrumentation:apache-httpclient:apache-httpclient-4.0:javaagent') @@ -64,6 +67,9 @@ dependencies { // needed for kinesis: testImplementation group: 'com.fasterxml.jackson.dataformat', name: 'jackson-dataformat-cbor', version: versions.jackson + // needed for SQS - using emq directly as localstack references emq v0.15.7 ie WITHOUT AWS trace header propagation + testLibrary group: 'org.elasticmq', name: 'elasticmq-rest-sqs_2.12', version: '1.0.0' + test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-s3', version: '1.11.0') test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-rds', version: '1.11.0') test_before_1_11_106Implementation(group: 'com.amazonaws', name: 'aws-java-sdk-ec2', version: '1.11.0') diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSdkClientTracer.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSdkClientTracer.java index 1ec779be69..2c15124476 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSdkClientTracer.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/AwsSdkClientTracer.java @@ -46,8 +46,9 @@ public class AwsSdkClientTracer extends HttpClientTracer, Request, return qualifiedOperation(awsServiceName, awsOperation); } - public Context startSpan(Context parentContext, Request request, RequestMeta requestMeta) { - Context context = super.startSpan(parentContext, request, request); + public Context startSpan( + Span.Kind kind, Context parentContext, Request request, RequestMeta requestMeta) { + Context context = super.startSpan(kind, parentContext, request, request, -1); Span span = Span.fromContext(context); String awsServiceName = request.getServiceName(); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsParentContext.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsParentContext.java new file mode 100644 index 0000000000..f8ce6ef797 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/SqsParentContext.java @@ -0,0 +1,41 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11; + +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.extension.trace.propagation.AwsXrayPropagator; +import java.util.Collections; +import java.util.Map; + +class SqsParentContext { + + private static class MapGetter implements TextMapPropagator.Getter> { + + private static final MapGetter INSTANCE = new MapGetter(); + + @Override + public Iterable keys(Map map) { + return map.keySet(); + } + + @Override + public String get(Map map, String s) { + return map.get(s); + } + } + + static final String AWS_TRACE_SYSTEM_ATTRIBUTE = "AWSTraceHeader"; + + static Context ofSystemAttributes(Map systemAttributes) { + String traceHeader = systemAttributes.get(AWS_TRACE_SYSTEM_ATTRIBUTE); + return AwsXrayPropagator.getInstance() + .extract( + Context.current(), + Collections.singletonMap("X-Amzn-Trace-Id", traceHeader), + MapGetter.INSTANCE); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java index 1fd2e28e47..168df8356f 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java @@ -12,9 +12,15 @@ import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.Request; import com.amazonaws.Response; import com.amazonaws.handlers.RequestHandler2; +import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.ReceiveMessageRequest; +import com.amazonaws.services.sqs.model.ReceiveMessageResult; +import com.amazonaws.services.sqs.model.SendMessageRequest; +import io.opentelemetry.api.trace.Span; import io.opentelemetry.context.Context; import io.opentelemetry.context.Scope; import io.opentelemetry.javaagent.instrumentation.api.ContextStore; +import java.util.List; /** Tracing Request Handler. */ public class TracingRequestHandler extends RequestHandler2 { @@ -27,19 +33,40 @@ public class TracingRequestHandler extends RequestHandler2 { @Override public void beforeRequest(Request request) { + AmazonWebServiceRequest originalRequest = request.getOriginalRequest(); + Span.Kind kind = (isSqsProducer(originalRequest) ? Span.Kind.PRODUCER : Span.Kind.CLIENT); + RequestMeta requestMeta = contextStore.get(originalRequest); Context parentContext = Context.current(); if (!tracer().shouldStartSpan(parentContext)) { return; } - Context context = tracer().startSpan(parentContext, request, requestMeta); + Context context = tracer().startSpan(kind, parentContext, request, requestMeta); Scope scope = context.makeCurrent(); request.addHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY, new ContextScopePair(context, scope)); } + private boolean isSqsProducer(AmazonWebServiceRequest request) { + return (request instanceof SendMessageRequest); + } + + @Override + public AmazonWebServiceRequest beforeMarshalling(AmazonWebServiceRequest request) { + if (isSqsConsumer(request)) { + ReceiveMessageRequest receiveMessageRequest = (ReceiveMessageRequest) request; + receiveMessageRequest.withAttributeNames(SqsParentContext.AWS_TRACE_SYSTEM_ATTRIBUTE); + } + return request; + } + @Override public void afterResponse(Request request, Response response) { + if (isSqsConsumer(request.getOriginalRequest())) { + afterConsumerResponse( + (Request) request, (Response) response); + } + // close outstanding "client" span ContextScopePair scope = request.getHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY); if (scope == null) { return; @@ -49,6 +76,28 @@ public class TracingRequestHandler extends RequestHandler2 { tracer().end(scope.getContext(), response); } + private boolean isSqsConsumer(AmazonWebServiceRequest request) { + return (request instanceof ReceiveMessageRequest); + } + + /** Create and close CONSUMER span for each message consumed. */ + private void afterConsumerResponse( + Request request, Response response) { + ReceiveMessageResult receiveMessageResult = response.getAwsResponse(); + List messages = receiveMessageResult.getMessages(); + for (Message message : messages) { + createConsumerSpan(message, request, response); + } + } + + private void createConsumerSpan(Message message, Request request, Response response) { + Context parentContext = SqsParentContext.ofSystemAttributes(message.getAttributes()); + AmazonWebServiceRequest originalRequest = request.getOriginalRequest(); + RequestMeta requestMeta = contextStore.get(originalRequest); + Context context = tracer().startSpan(Span.Kind.CONSUMER, parentContext, request, requestMeta); + tracer().end(context, response); + } + @Override public void afterError(Request request, Response response, Exception e) { ContextScopePair scope = request.getHandlerContext(CONTEXT_SCOPE_PAIR_CONTEXT_KEY); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/Aws1ClientTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/Aws1ClientTest.groovy index 95abf4c439..185d5fedff 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/Aws1ClientTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/Aws1ClientTest.groovy @@ -4,6 +4,7 @@ */ import static io.opentelemetry.api.trace.Span.Kind.CLIENT +import static io.opentelemetry.api.trace.Span.Kind.PRODUCER import static io.opentelemetry.instrumentation.test.server.http.TestHttpServer.httpServer import static io.opentelemetry.instrumentation.test.utils.PortUtils.UNUSABLE_PORT @@ -137,7 +138,7 @@ class Aws1ClientTest extends AgentTestRunner { trace(0, 1) { span(0) { name "$service.$operation" - kind CLIENT + kind operation == "SendMessage" ? PRODUCER : CLIENT errored false hasNoParent() attributes { diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SqsTracingTest.groovy new file mode 100644 index 0000000000..66560a61e2 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/groovy/SqsTracingTest.groovy @@ -0,0 +1,147 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +import static io.opentelemetry.api.trace.Span.Kind.CLIENT +import static io.opentelemetry.api.trace.Span.Kind.CONSUMER +import static io.opentelemetry.api.trace.Span.Kind.PRODUCER + +import com.amazonaws.auth.AWSStaticCredentialsProvider +import com.amazonaws.auth.BasicAWSCredentials +import com.amazonaws.client.builder.AwsClientBuilder +import com.amazonaws.services.sqs.AmazonSQSAsyncClient +import com.amazonaws.services.sqs.model.SendMessageRequest +import io.opentelemetry.instrumentation.test.AgentTestRunner +import io.opentelemetry.instrumentation.test.utils.PortUtils +import org.elasticmq.rest.sqs.SQSRestServerBuilder +import spock.lang.Shared + +class SqsTracingTest extends AgentTestRunner { + + @Shared + def sqs + @Shared + AmazonSQSAsyncClient client + @Shared + int sqsPort + + def setupSpec() { + + sqsPort = PortUtils.randomOpenPort() + sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start() + println getClass().name + " SQS server started at: localhost:$sqsPort/" + + def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x")) + def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:"+sqsPort, "elasticmq") + client = AmazonSQSAsyncClient.asyncBuilder().withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build() + } + + def cleanupSpec() { + if (sqs != null) { + sqs.stopAndWait() + } + } + + def "simple sqs producer-consumer services"() { + setup: + client.createQueue("testSdkSqs") + + when: + SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") + client.sendMessage(send) + client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + + then: + assertTraces(3) { + trace(0, 1) { + + span(0) { + name "SQS.CreateQueue" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "CreateQueueRequest" + "aws.queue.name" "testSdkSqs" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + } + trace(1, 2) { + span(0) { + name "SQS.SendMessage" + kind PRODUCER + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "SendMessageRequest" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + span(1) { + name "SQS.ReceiveMessage" + kind CONSUMER + childOf span(0) + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "ReceiveMessageRequest" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "http.user_agent" String + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + } + /** + * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear + */ + trace(2, 1) { + span(0) { + name "SQS.ReceiveMessage" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.endpoint" "http://localhost:$sqsPort" + "aws.operation" "ReceiveMessageRequest" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "aws.service" "AmazonSQS" + "http.flavor" "1.1" + "http.method" "POST" + "http.status_code" 200 + "http.url" "http://localhost:$sqsPort" + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "net.transport" "IP.TCP" + } + } + } + } + } +} \ No newline at end of file diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/resources/logback-test.xml b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..64807657e2 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/test/resources/logback-test.xml @@ -0,0 +1,18 @@ + + + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + +