From a1f691798e4f2cfba7742ed91ca13488c8a8c6f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christian=20Neum=C3=BCller?= Date: Thu, 29 Jun 2023 16:55:11 +0200 Subject: [PATCH] aws-sdk-2.2.: Support injection into SQS.SendMessageBatch message attributes (#8798) Co-authored-by: Mateusz Rzeszutek --- .../aws-sdk-2.2/library/build.gradle.kts | 3 + .../awssdk/v2_2/SqsAccess.java | 61 +++--- .../instrumentation/awssdk/v2_2/SqsImpl.java | 126 +++++++++---- .../v2_2/TracingExecutionInterceptor.java | 20 +- .../awssdk/v2_2/Aws2SqsTracingTest.groovy | 5 + ...Aws2SqsTracingTestWithW3CPropagator.groovy | 14 +- ...tWithW3CPropagatorAndXrayPropagator.groovy | 7 +- .../v2_2/AbstractAws2ClientCoreTest.groovy | 2 +- .../v2_2/AbstractAws2SqsTracingTest.groovy | 173 +++++++++++++++++- 9 files changed, 323 insertions(+), 88 deletions(-) diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts b/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts index 14559d2286..94e6c648e1 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/build.gradle.kts @@ -42,6 +42,9 @@ testing { tasks { withType { systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean) + + // NB: If you'd like to change these, there is some cleanup work to be done, as most tests ignore this and + // set the value directly (the "library" does not normally query it, only library-autoconfigure) systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true) systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true) } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java index 22a41c4bbb..06c7f343e2 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsAccess.java @@ -7,13 +7,10 @@ package io.opentelemetry.instrumentation.awssdk.v2_2; import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; +import javax.annotation.Nullable; import software.amazon.awssdk.core.SdkRequest; -import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; -import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; -import software.amazon.awssdk.services.sqs.model.SendMessageRequest; // helper class for calling methods that use sqs types in SqsImpl // if SqsImpl is not present these methods are no op @@ -23,42 +20,26 @@ final class SqsAccess { private static final boolean enabled = PluginImplUtil.isImplPresent("SqsImpl"); @NoMuzzle - static boolean isSendMessageRequest(SdkRequest request) { - return enabled && request instanceof SendMessageRequest; - } - - @NoMuzzle - static SdkRequest injectIntoSendMessageRequest( - TextMapPropagator messagingPropagator, - SdkRequest rawRequest, - io.opentelemetry.context.Context otelContext) { - assert enabled; // enabled checked already in instance check. - return SqsImpl.injectIntoSendMessageRequest(messagingPropagator, rawRequest, otelContext); - } - - @NoMuzzle - static boolean isReceiveMessageRequest(SdkRequest request) { - return enabled && request instanceof ReceiveMessageRequest; - } - - @NoMuzzle - public static SdkRequest modifyReceiveMessageRequest( - SdkRequest request, boolean useXrayPropagator, TextMapPropagator messagingPropagator) { - assert enabled; // enabled checked already in instance check. - return SqsImpl.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator); - } - - @NoMuzzle - static boolean isReceiveMessageResponse(SdkResponse response) { - return enabled && response instanceof ReceiveMessageResponse; - } - - @NoMuzzle - static void afterReceiveMessageExecution( - TracingExecutionInterceptor config, + static boolean afterReceiveMessageExecution( Context.AfterExecution context, - ExecutionAttributes executionAttributes) { - assert enabled; // enabled checked already in instance check. - SqsImpl.afterReceiveMessageExecution(config, executionAttributes, context); + ExecutionAttributes executionAttributes, + TracingExecutionInterceptor config) { + return enabled && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config); + } + + /** + * Returns {@code null} (not the unmodified {@code request}!) if nothing matched, so that other + * handling can be tried. + */ + @Nullable + @NoMuzzle + static SdkRequest modifyRequest( + SdkRequest request, + io.opentelemetry.context.Context otelContext, + boolean useXrayPropagator, + TextMapPropagator messagingPropagator) { + return enabled + ? SqsImpl.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator) + : null; } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java index 759d1eeb50..587a5b919f 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/SqsImpl.java @@ -11,7 +11,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import javax.annotation.Nullable; import software.amazon.awssdk.core.SdkRequest; +import software.amazon.awssdk.core.SdkResponse; import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.http.SdkHttpResponse; @@ -20,6 +22,8 @@ import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageAttributeValue; import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest; import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry; import software.amazon.awssdk.services.sqs.model.SendMessageRequest; // this class is only used from SqsAccess from method with @NoMuzzle annotation @@ -33,44 +37,30 @@ final class SqsImpl { private SqsImpl() {} - static SdkRequest injectIntoSendMessageRequest( - TextMapPropagator messagingPropagator, - SdkRequest rawRequest, - io.opentelemetry.context.Context otelContext) { - SendMessageRequest request = (SendMessageRequest) rawRequest; - Map messageAttributes = - new HashMap<>(request.messageAttributes()); - - messagingPropagator.inject( - otelContext, - messageAttributes, - (carrier, k, v) -> { - carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build()); - }); - - if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call. - return request; - } - return request.toBuilder().messageAttributes(messageAttributes).build(); - } - - /** Create and close CONSUMER span for each message consumed. */ - static void afterReceiveMessageExecution( - TracingExecutionInterceptor config, + static boolean afterReceiveMessageExecution( + Context.AfterExecution context, ExecutionAttributes executionAttributes, - Context.AfterExecution context) { - ReceiveMessageResponse response = (ReceiveMessageResponse) context.response(); + TracingExecutionInterceptor config) { + + SdkResponse rawResponse = context.response(); + if (!(rawResponse instanceof ReceiveMessageResponse)) { + return false; + } + + ReceiveMessageResponse response = (ReceiveMessageResponse) rawResponse; SdkHttpResponse httpResponse = context.httpResponse(); for (Message message : response.messages()) { - createConsumerSpan(config, message, executionAttributes, httpResponse); + createConsumerSpan(message, httpResponse, executionAttributes, config); } + + return true; } private static void createConsumerSpan( - TracingExecutionInterceptor config, Message message, + SdkHttpResponse httpResponse, ExecutionAttributes executionAttributes, - SdkHttpResponse httpResponse) { + TracingExecutionInterceptor config) { io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root(); @@ -99,9 +89,81 @@ final class SqsImpl { } } - static SdkRequest modifyReceiveMessageRequest( - SdkRequest rawRequest, boolean useXrayPropagator, TextMapPropagator messagingPropagator) { - ReceiveMessageRequest request = (ReceiveMessageRequest) rawRequest; + @Nullable + static SdkRequest modifyRequest( + SdkRequest request, + io.opentelemetry.context.Context otelContext, + boolean useXrayPropagator, + TextMapPropagator messagingPropagator) { + if (request instanceof ReceiveMessageRequest) { + return modifyReceiveMessageRequest( + (ReceiveMessageRequest) request, useXrayPropagator, messagingPropagator); + } else if (messagingPropagator != null) { + if (request instanceof SendMessageRequest) { + return injectIntoSendMessageRequest( + (SendMessageRequest) request, otelContext, messagingPropagator); + } else if (request instanceof SendMessageBatchRequest) { + return injectIntoSendMessageBatchRequest( + (SendMessageBatchRequest) request, otelContext, messagingPropagator); + } + } + return null; + } + + private static SdkRequest injectIntoSendMessageBatchRequest( + SendMessageBatchRequest request, + io.opentelemetry.context.Context otelContext, + TextMapPropagator messagingPropagator) { + ArrayList entries = new ArrayList<>(request.entries()); + for (int i = 0; i < entries.size(); ++i) { + SendMessageBatchRequestEntry entry = entries.get(i); + Map messageAttributes = + new HashMap<>(entry.messageAttributes()); + + // TODO: Per https://github.com/open-telemetry/oteps/pull/220, each message should get + // a separate context. We don't support this yet, also because it would be inconsistent + // with the header-based X-Ray propagation + // (probably could override it here by setting the X-Ray message system attribute) + if (injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) { + entries.set(i, entry.toBuilder().messageAttributes(messageAttributes).build()); + } + } + return request.toBuilder().entries(entries).build(); + } + + private static SdkRequest injectIntoSendMessageRequest( + SendMessageRequest request, + io.opentelemetry.context.Context otelContext, + TextMapPropagator messagingPropagator) { + Map messageAttributes = + new HashMap<>(request.messageAttributes()); + if (!injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) { + return request; + } + return request.toBuilder().messageAttributes(messageAttributes).build(); + } + + private static boolean injectIntoMessageAttributes( + Map messageAttributes, + io.opentelemetry.context.Context otelContext, + TextMapPropagator messagingPropagator) { + messagingPropagator.inject( + otelContext, + messageAttributes, + (carrier, k, v) -> { + carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build()); + }); + + // Return whether the injection resulted in an attribute count that is still supported. + // See + // https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes + return messageAttributes.size() <= 10; + } + + private static SdkRequest modifyReceiveMessageRequest( + ReceiveMessageRequest request, + boolean useXrayPropagator, + TextMapPropagator messagingPropagator) { boolean hasXrayAttribute = true; List existingAttributeNames = null; if (useXrayPropagator) { diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java index ccdc223ec5..ed2c9bae8b 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v2_2/TracingExecutionInterceptor.java @@ -121,14 +121,14 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor { throw throwable; } - if (SqsAccess.isReceiveMessageRequest(request)) { - return SqsAccess.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator); - } else if (messagingPropagator != null) { - if (SqsAccess.isSendMessageRequest(request)) { - return SqsAccess.injectIntoSendMessageRequest(messagingPropagator, request, otelContext); - } - // TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry) + SdkRequest sqsModifiedRequest = + SqsAccess.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator); + if (sqsModifiedRequest != null) { + return sqsModifiedRequest; } + + // Insert other special handling here, following the same pattern as SQS. + return request; } @@ -225,9 +225,9 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor { @Override public void afterExecution( Context.AfterExecution context, ExecutionAttributes executionAttributes) { - if (SqsAccess.isReceiveMessageResponse(context.response())) { - SqsAccess.afterReceiveMessageExecution(this, context, executionAttributes); - } + + // Other special handling could be shortcut-&&ed after this (false is returned if not handled). + SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this); io.opentelemetry.context.Context otelContext = getContext(executionAttributes); if (otelContext != null) { diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy index 3aadab3cbc..236f846fa8 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTest.groovy @@ -18,4 +18,9 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe .build() .newExecutionInterceptor()) } + + @Override + boolean isSqsAttributeInjectionEnabled() { + false + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy index 4099b0ae91..b06a7a3545 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagator.groovy @@ -15,9 +15,19 @@ class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest imp .addExecutionInterceptor( AwsSdkTelemetry.builder(getOpenTelemetry()) .setCaptureExperimentalSpanAttributes(true) - .setUseConfiguredPropagatorForMessaging(true) // Difference to main test - .setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works + .setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test + .setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works .build() .newExecutionInterceptor()) } + + @Override + boolean isSqsAttributeInjectionEnabled() { + true + } + + @Override + boolean isXrayInjectionEnabled() { + false + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy index a4af1e1b56..e6b5ddc3e5 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator.groovy @@ -16,8 +16,13 @@ class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2S .addExecutionInterceptor( AwsSdkTelemetry.builder(getOpenTelemetry()) .setCaptureExperimentalSpanAttributes(true) - .setUseConfiguredPropagatorForMessaging(true) // Difference to main test + .setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test .build() .newExecutionInterceptor()) } + + @Override + boolean isSqsAttributeInjectionEnabled() { + true + } } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientCoreTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientCoreTest.groovy index c270a6f387..6463c400a3 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientCoreTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2ClientCoreTest.groovy @@ -31,7 +31,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT @Unroll abstract class AbstractAws2ClientCoreTest extends InstrumentationSpecification { - def isSqsAttributeInjectionEnabled() { + static boolean isSqsAttributeInjectionEnabled() { // See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor return ConfigPropertiesUtil.getBoolean("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false) } diff --git a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy index a86c9bc8a8..a3529957b6 100644 --- a/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-2.2/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v2_2/AbstractAws2SqsTracingTest.groovy @@ -16,7 +16,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder import software.amazon.awssdk.services.sqs.SqsClient import software.amazon.awssdk.services.sqs.model.CreateQueueRequest +import software.amazon.awssdk.services.sqs.model.MessageAttributeValue import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest +import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest import software.amazon.awssdk.services.sqs.model.SendMessageRequest import spock.lang.Shared @@ -35,8 +37,25 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { @Shared int sqsPort + static Map dummyMessageAttributes(count) { + (0.. e.messageBody("e1").id("i1"), + // 8 attributes, injection always possible + e -> e.messageBody("e2").id("i2") + .messageAttributes(dummyMessageAttributes(8)), + // 10 attributes, injection with custom propagator never possible + e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10))) + .build() + + boolean isSqsAttributeInjectionEnabled() { + AbstractAws2ClientCoreTest.isSqsAttributeInjectionEnabled() + } + + boolean isXrayInjectionEnabled() { + true + } + void configureSdkClient(SqsBaseClientBuilder builder) { builder .overrideConfiguration(createOverrideConfigurationBuilder().build()) @@ -124,6 +162,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { name "Sqs.ReceiveMessage" kind CONSUMER childOf span(0) + hasNoLinks() // TODO: Link to receive operation? attributes { "aws.agent" "java-aws-sdk" "rpc.method" "ReceiveMessage" @@ -148,6 +187,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { name "Sqs.ReceiveMessage" kind CLIENT hasNoParent() + hasNoLinks() attributes { "aws.agent" "java-aws-sdk" "aws.requestId" "00000000-0000-0000-0000-000000000000" @@ -204,4 +244,133 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification { resp.messages().size() == 1 assertSqsTraces() } + + def "batch sqs producer-consumer services: sync"() { + setup: + def builder = SqsClient.builder() + configureSdkClient(builder) + def client = builder.build() + + client.createQueue(createQueueRequest) + + when: + client.sendMessageBatch(sendMessageBatchRequest) + + def resp = client.receiveMessage(receiveMessageBatchRequest) + def totalAttrs = resp.messages().sum {it.messageAttributes().size() } + + then: + resp.messages().size() == 3 + + // +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs + totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0) + + assertTraces(xrayInjectionEnabled ? 3 : 4) { + trace(0, 1) { + + span(0) { + name "Sqs.CreateQueue" + kind CLIENT + } + } + trace(1, xrayInjectionEnabled ? 4 : 3) { + span(0) { + name "Sqs.SendMessageBatch" + kind CLIENT // TODO: Probably this should be producer, but that would be a breaking change + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + "rpc.system" "aws-api" + "rpc.method" "SendMessageBatch" + "rpc.service" "Sqs" + "http.method" "POST" + "http.status_code" 200 + "http.url" { it.startsWith("http://localhost:$sqsPort") } + "$SemanticAttributes.USER_AGENT_ORIGINAL" String + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + } + } + for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) { + span(i) { + name "Sqs.ReceiveMessage" + kind CONSUMER + childOf span(0) + hasNoLinks() // TODO: Link to receive operation? + + attributes { + "aws.agent" "java-aws-sdk" + "rpc.method" "ReceiveMessage" + "rpc.system" "aws-api" + "rpc.service" "Sqs" + "http.method" "POST" + "http.status_code" 200 + "http.url" { it.startsWith("http://localhost:$sqsPort") } + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + } + } + } + } + /** + * This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message). + * This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear + */ + trace(2, 1) { + span(0) { + name "Sqs.ReceiveMessage" + kind CLIENT + hasNoParent() + attributes { + "aws.agent" "java-aws-sdk" + "aws.requestId" "00000000-0000-0000-0000-000000000000" + "rpc.method" "ReceiveMessage" + "aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs" + "rpc.system" "aws-api" + "rpc.service" "Sqs" + "http.method" "POST" + "http.status_code" 200 + "http.url" { it.startsWith("http://localhost:$sqsPort") } + "$SemanticAttributes.USER_AGENT_ORIGINAL" String + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + } + } + } + if (!xrayInjectionEnabled) { + trace(3, 1) { + span(0) { + name "Sqs.ReceiveMessage" + kind CONSUMER + + // TODO This is not nice at all, and can also happen if producer is not instrumented + hasNoParent() + hasNoLinks() // TODO: Link to receive operation? + + attributes { + "aws.agent" "java-aws-sdk" + "rpc.method" "ReceiveMessage" + "rpc.system" "aws-api" + "rpc.service" "Sqs" + "http.method" "POST" + "http.status_code" 200 + "http.url" { it.startsWith("http://localhost:$sqsPort") } + "net.peer.name" "localhost" + "net.peer.port" sqsPort + "$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long } + "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long } + } + } + } + } + } + } }