aws-sdk-2.2.: Support injection into SQS.SendMessageBatch message attributes (#8798)

Co-authored-by: Mateusz Rzeszutek <mrzeszutek@splunk.com>
This commit is contained in:
Christian Neumüller 2023-06-29 16:55:11 +02:00 committed by GitHub
parent eaf11ab2f3
commit a1f691798e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 323 additions and 88 deletions

View File

@ -42,6 +42,9 @@ testing {
tasks {
withType<Test> {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
// NB: If you'd like to change these, there is some cleanup work to be done, as most tests ignore this and
// set the value directly (the "library" does not normally query it, only library-autoconfigure)
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
}

View File

@ -7,13 +7,10 @@ package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
// helper class for calling methods that use sqs types in SqsImpl
// if SqsImpl is not present these methods are no op
@ -23,42 +20,26 @@ final class SqsAccess {
private static final boolean enabled = PluginImplUtil.isImplPresent("SqsImpl");
@NoMuzzle
static boolean isSendMessageRequest(SdkRequest request) {
return enabled && request instanceof SendMessageRequest;
}
@NoMuzzle
static SdkRequest injectIntoSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
assert enabled; // enabled checked already in instance check.
return SqsImpl.injectIntoSendMessageRequest(messagingPropagator, rawRequest, otelContext);
}
@NoMuzzle
static boolean isReceiveMessageRequest(SdkRequest request) {
return enabled && request instanceof ReceiveMessageRequest;
}
@NoMuzzle
public static SdkRequest modifyReceiveMessageRequest(
SdkRequest request, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
assert enabled; // enabled checked already in instance check.
return SqsImpl.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
}
@NoMuzzle
static boolean isReceiveMessageResponse(SdkResponse response) {
return enabled && response instanceof ReceiveMessageResponse;
}
@NoMuzzle
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
static boolean afterReceiveMessageExecution(
Context.AfterExecution context,
ExecutionAttributes executionAttributes) {
assert enabled; // enabled checked already in instance check.
SqsImpl.afterReceiveMessageExecution(config, executionAttributes, context);
ExecutionAttributes executionAttributes,
TracingExecutionInterceptor config) {
return enabled && SqsImpl.afterReceiveMessageExecution(context, executionAttributes, config);
}
/**
* Returns {@code null} (not the unmodified {@code request}!) if nothing matched, so that other
* handling can be tried.
*/
@Nullable
@NoMuzzle
static SdkRequest modifyRequest(
SdkRequest request,
io.opentelemetry.context.Context otelContext,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
return enabled
? SqsImpl.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator)
: null;
}
}

View File

@ -11,7 +11,9 @@ import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpResponse;
@ -20,6 +22,8 @@ import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
// this class is only used from SqsAccess from method with @NoMuzzle annotation
@ -33,44 +37,30 @@ final class SqsImpl {
private SqsImpl() {}
static SdkRequest injectIntoSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
SendMessageRequest request = (SendMessageRequest) rawRequest;
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});
if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}
/** Create and close CONSUMER span for each message consumed. */
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
static boolean afterReceiveMessageExecution(
Context.AfterExecution context,
ExecutionAttributes executionAttributes,
Context.AfterExecution context) {
ReceiveMessageResponse response = (ReceiveMessageResponse) context.response();
TracingExecutionInterceptor config) {
SdkResponse rawResponse = context.response();
if (!(rawResponse instanceof ReceiveMessageResponse)) {
return false;
}
ReceiveMessageResponse response = (ReceiveMessageResponse) rawResponse;
SdkHttpResponse httpResponse = context.httpResponse();
for (Message message : response.messages()) {
createConsumerSpan(config, message, executionAttributes, httpResponse);
createConsumerSpan(message, httpResponse, executionAttributes, config);
}
return true;
}
private static void createConsumerSpan(
TracingExecutionInterceptor config,
Message message,
SdkHttpResponse httpResponse,
ExecutionAttributes executionAttributes,
SdkHttpResponse httpResponse) {
TracingExecutionInterceptor config) {
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();
@ -99,9 +89,81 @@ final class SqsImpl {
}
}
static SdkRequest modifyReceiveMessageRequest(
SdkRequest rawRequest, boolean useXrayPropagator, TextMapPropagator messagingPropagator) {
ReceiveMessageRequest request = (ReceiveMessageRequest) rawRequest;
@Nullable
static SdkRequest modifyRequest(
SdkRequest request,
io.opentelemetry.context.Context otelContext,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
if (request instanceof ReceiveMessageRequest) {
return modifyReceiveMessageRequest(
(ReceiveMessageRequest) request, useXrayPropagator, messagingPropagator);
} else if (messagingPropagator != null) {
if (request instanceof SendMessageRequest) {
return injectIntoSendMessageRequest(
(SendMessageRequest) request, otelContext, messagingPropagator);
} else if (request instanceof SendMessageBatchRequest) {
return injectIntoSendMessageBatchRequest(
(SendMessageBatchRequest) request, otelContext, messagingPropagator);
}
}
return null;
}
private static SdkRequest injectIntoSendMessageBatchRequest(
SendMessageBatchRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
ArrayList<SendMessageBatchRequestEntry> entries = new ArrayList<>(request.entries());
for (int i = 0; i < entries.size(); ++i) {
SendMessageBatchRequestEntry entry = entries.get(i);
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(entry.messageAttributes());
// TODO: Per https://github.com/open-telemetry/oteps/pull/220, each message should get
// a separate context. We don't support this yet, also because it would be inconsistent
// with the header-based X-Ray propagation
// (probably could override it here by setting the X-Ray message system attribute)
if (injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
entries.set(i, entry.toBuilder().messageAttributes(messageAttributes).build());
}
}
return request.toBuilder().entries(entries).build();
}
private static SdkRequest injectIntoSendMessageRequest(
SendMessageRequest request,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
if (!injectIntoMessageAttributes(messageAttributes, otelContext, messagingPropagator)) {
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}
private static boolean injectIntoMessageAttributes(
Map<String, MessageAttributeValue> messageAttributes,
io.opentelemetry.context.Context otelContext,
TextMapPropagator messagingPropagator) {
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});
// Return whether the injection resulted in an attribute count that is still supported.
// See
// https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-message-metadata.html#sqs-message-attributes
return messageAttributes.size() <= 10;
}
private static SdkRequest modifyReceiveMessageRequest(
ReceiveMessageRequest request,
boolean useXrayPropagator,
TextMapPropagator messagingPropagator) {
boolean hasXrayAttribute = true;
List<String> existingAttributeNames = null;
if (useXrayPropagator) {

View File

@ -121,14 +121,14 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
throw throwable;
}
if (SqsAccess.isReceiveMessageRequest(request)) {
return SqsAccess.modifyReceiveMessageRequest(request, useXrayPropagator, messagingPropagator);
} else if (messagingPropagator != null) {
if (SqsAccess.isSendMessageRequest(request)) {
return SqsAccess.injectIntoSendMessageRequest(messagingPropagator, request, otelContext);
}
// TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry)
SdkRequest sqsModifiedRequest =
SqsAccess.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator);
if (sqsModifiedRequest != null) {
return sqsModifiedRequest;
}
// Insert other special handling here, following the same pattern as SQS.
return request;
}
@ -225,9 +225,9 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
@Override
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
if (SqsAccess.isReceiveMessageResponse(context.response())) {
SqsAccess.afterReceiveMessageExecution(this, context, executionAttributes);
}
// Other special handling could be shortcut-&&ed after this (false is returned if not handled).
SqsAccess.afterReceiveMessageExecution(context, executionAttributes, this);
io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
if (otelContext != null) {

View File

@ -18,4 +18,9 @@ class Aws2SqsTracingTest extends AbstractAws2SqsTracingTest implements LibraryTe
.build()
.newExecutionInterceptor())
}
@Override
boolean isSqsAttributeInjectionEnabled() {
false
}
}

View File

@ -15,9 +15,19 @@ class Aws2SqsTracingTestWithW3CPropagator extends AbstractAws2SqsTracingTest imp
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseXrayPropagator(false) // Disable to confirm messaging propagator actually works
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
.build()
.newExecutionInterceptor())
}
@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
@Override
boolean isXrayInjectionEnabled() {
false
}
}

View File

@ -16,8 +16,13 @@ class Aws2SqsTracingTestWithW3CPropagatorAndXrayPropagator extends AbstractAws2S
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Difference to main test
.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.build()
.newExecutionInterceptor())
}
@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
}

View File

@ -31,7 +31,7 @@ import static io.opentelemetry.api.trace.SpanKind.CLIENT
@Unroll
abstract class AbstractAws2ClientCoreTest extends InstrumentationSpecification {
def isSqsAttributeInjectionEnabled() {
static boolean isSqsAttributeInjectionEnabled() {
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
return ConfigPropertiesUtil.getBoolean("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false)
}

View File

@ -16,7 +16,9 @@ import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import spock.lang.Shared
@ -35,8 +37,25 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
@Shared
int sqsPort
static Map<String, MessageAttributeValue> dummyMessageAttributes(count) {
(0..<count).collectEntries {
[
"a$it".toString(),
MessageAttributeValue.builder().stringValue("v$it").dataType("String").build()]
}
}
String queueUrl = "http://localhost:$sqsPort/000000000000/testSdkSqs"
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.queueUrl(queueUrl)
.build()
ReceiveMessageRequest receiveMessageBatchRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(3)
.messageAttributeNames("All")
.waitTimeSeconds(5)
.build()
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
@ -44,10 +63,29 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
.build()
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.queueUrl(queueUrl)
.messageBody("{\"type\": \"hello\"}")
.build()
SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(
e -> e.messageBody("e1").id("i1"),
// 8 attributes, injection always possible
e -> e.messageBody("e2").id("i2")
.messageAttributes(dummyMessageAttributes(8)),
// 10 attributes, injection with custom propagator never possible
e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10)))
.build()
boolean isSqsAttributeInjectionEnabled() {
AbstractAws2ClientCoreTest.isSqsAttributeInjectionEnabled()
}
boolean isXrayInjectionEnabled() {
true
}
void configureSdkClient(SqsBaseClientBuilder builder) {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
@ -124,6 +162,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
name "Sqs.ReceiveMessage"
kind CONSUMER
childOf span(0)
hasNoLinks() // TODO: Link to receive operation?
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
@ -148,6 +187,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
name "Sqs.ReceiveMessage"
kind CLIENT
hasNoParent()
hasNoLinks()
attributes {
"aws.agent" "java-aws-sdk"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
@ -204,4 +244,133 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
resp.messages().size() == 1
assertSqsTraces()
}
def "batch sqs producer-consumer services: sync"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def client = builder.build()
client.createQueue(createQueueRequest)
when:
client.sendMessageBatch(sendMessageBatchRequest)
def resp = client.receiveMessage(receiveMessageBatchRequest)
def totalAttrs = resp.messages().sum {it.messageAttributes().size() }
then:
resp.messages().size() == 3
// +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs
totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0)
assertTraces(xrayInjectionEnabled ? 3 : 4) {
trace(0, 1) {
span(0) {
name "Sqs.CreateQueue"
kind CLIENT
}
}
trace(1, xrayInjectionEnabled ? 4 : 3) {
span(0) {
name "Sqs.SendMessageBatch"
kind CLIENT // TODO: Probably this should be producer, but that would be a breaking change
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
"rpc.system" "aws-api"
"rpc.method" "SendMessageBatch"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) {
span(i) {
name "Sqs.ReceiveMessage"
kind CONSUMER
childOf span(0)
hasNoLinks() // TODO: Link to receive operation?
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
}
}
/**
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
*/
trace(2, 1) {
span(0) {
name "Sqs.ReceiveMessage"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.requestId" "00000000-0000-0000-0000-000000000000"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
}
if (!xrayInjectionEnabled) {
trace(3, 1) {
span(0) {
name "Sqs.ReceiveMessage"
kind CONSUMER
// TODO This is not nice at all, and can also happen if producer is not instrumented
hasNoParent()
hasNoLinks() // TODO: Link to receive operation?
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"http.method" "POST"
"http.status_code" 200
"http.url" { it.startsWith("http://localhost:$sqsPort") }
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
}
}
}
}
}