Implement capturing message headers for aws1 sqs spans (#9824)

This commit is contained in:
Lauri Tulmin 2023-11-09 16:18:05 +02:00 committed by GitHub
parent 55681e6cee
commit ff97f6cce8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 311 additions and 103 deletions

View File

@ -5,7 +5,10 @@
package io.opentelemetry.instrumentation.api.internal; package io.opentelemetry.instrumentation.api.internal;
import java.util.Arrays;
import java.util.List;
import java.util.Locale; import java.util.Locale;
import java.util.stream.Collectors;
import javax.annotation.Nullable; import javax.annotation.Nullable;
/** /**
@ -40,6 +43,21 @@ public final class ConfigPropertiesUtil {
return System.getenv(toEnvVarName(propertyName)); return System.getenv(toEnvVarName(propertyName));
} }
public static List<String> getList(String propertyName, List<String> defaultValue) {
String value = getString(propertyName);
if (value == null) {
return defaultValue;
}
return filterBlanksAndNulls(value.split(","));
}
private static List<String> filterBlanksAndNulls(String[] values) {
return Arrays.stream(values)
.map(String::trim)
.filter(s -> !s.isEmpty())
.collect(Collectors.toList());
}
private static String toEnvVarName(String propertyName) { private static String toEnvVarName(String propertyName) {
return propertyName.toUpperCase(Locale.ROOT).replace('-', '_').replace('.', '_'); return propertyName.toUpperCase(Locale.ROOT).replace('-', '_').replace('.', '_');
} }

View File

@ -40,6 +40,7 @@ public class TracingRequestHandler extends RequestHandler2 {
.getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false)) .getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false))
.setMessagingReceiveInstrumentationEnabled( .setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
.build() .build()
.newRequestHandler(); .newRequestHandler();

View File

@ -26,6 +26,7 @@ dependencies {
tasks { tasks {
withType<Test>().configureEach { withType<Test>().configureEach {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true") systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header")
} }
val testReceiveSpansDisabled by registering(Test::class) { val testReceiveSpansDisabled by registering(Test::class) {

View File

@ -5,6 +5,8 @@
package io.opentelemetry.instrumentation.awssdk.v1_11.autoconfigure; package io.opentelemetry.instrumentation.awssdk.v1_11.autoconfigure;
import static java.util.Collections.emptyList;
import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request; import com.amazonaws.Request;
import com.amazonaws.Response; import com.amazonaws.Response;
@ -26,6 +28,9 @@ public class TracingRequestHandler extends RequestHandler2 {
.setMessagingReceiveInstrumentationEnabled( .setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean( ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false)) "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.setCapturedHeaders(
ConfigPropertiesUtil.getList(
"otel.instrumentation.messaging.experimental.capture-headers", emptyList()))
.build() .build()
.newRequestHandler(); .newRequestHandler();

View File

@ -0,0 +1,13 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
abstract class AbstractSqsRequest {
public abstract Request<?> getRequest();
}

View File

@ -22,11 +22,13 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.function.Function;
import javax.annotation.Nullable; import javax.annotation.Nullable;
final class AwsSdkInstrumenterFactory { final class AwsSdkInstrumenterFactory {
@ -47,48 +49,73 @@ final class AwsSdkInstrumenterFactory {
httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor); httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor);
private static final AwsSdkSpanNameExtractor spanName = new AwsSdkSpanNameExtractor(); private static final AwsSdkSpanNameExtractor spanName = new AwsSdkSpanNameExtractor();
static Instrumenter<Request<?>, Response<?>> requestInstrumenter( private final OpenTelemetry openTelemetry;
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { private final List<String> capturedHeaders;
private final boolean captureExperimentalSpanAttributes;
private final boolean messagingReceiveInstrumentationEnabled;
AwsSdkInstrumenterFactory(
OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
this.openTelemetry = openTelemetry;
this.capturedHeaders = capturedHeaders;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
}
Instrumenter<Request<?>, Response<?>> requestInstrumenter() {
return createInstrumenter( return createInstrumenter(
openTelemetry, openTelemetry,
captureExperimentalSpanAttributes,
spanName, spanName,
SpanKindExtractor.alwaysClient(), SpanKindExtractor.alwaysClient(),
attributesExtractors(),
emptyList(), emptyList(),
true); true);
} }
static Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter( private List<AttributesExtractor<Request<?>, Response<?>>> attributesExtractors() {
OpenTelemetry openTelemetry, return captureExperimentalSpanAttributes
boolean captureExperimentalSpanAttributes, ? extendedAttributesExtractors
boolean messagingReceiveInstrumentationEnabled) { : defaultAttributesExtractors;
return sqsInstrumenter( }
private <REQUEST, RESPONSE> AttributesExtractor<REQUEST, RESPONSE> messagingAttributesExtractor(
MessagingAttributesGetter<REQUEST, RESPONSE> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter() {
MessageOperation operation = MessageOperation.RECEIVE;
SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsReceiveRequest, Response<?>> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);
return createInstrumenter(
openTelemetry, openTelemetry,
MessageOperation.RECEIVE, MessagingSpanNameExtractor.create(getter, operation),
captureExperimentalSpanAttributes, SpanKindExtractor.alwaysConsumer(),
toSqsRequestExtractors(attributesExtractors(), Function.identity()),
singletonList(messagingAttributeExtractor),
messagingReceiveInstrumentationEnabled); messagingReceiveInstrumentationEnabled);
} }
static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter( Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter() {
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
MessageOperation operation = MessageOperation.PROCESS; MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE; SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
AttributesExtractor<SqsProcessRequest, Void> messagingAttributeExtractor =
messagingAttributesExtractor(getter, operation);
InstrumenterBuilder<SqsProcessRequest, Void> builder = InstrumenterBuilder<SqsProcessRequest, Void> builder =
Instrumenter.<SqsProcessRequest, Void>builder( Instrumenter.<SqsProcessRequest, Void>builder(
openTelemetry, openTelemetry,
INSTRUMENTATION_NAME, INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation)) MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors( .addAttributesExtractors(toSqsRequestExtractors(attributesExtractors(), unused -> null))
toProcessRequestExtractors( .addAttributesExtractor(messagingAttributeExtractor);
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation).build());
if (messagingReceiveInstrumentationEnabled) { if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor( builder.addSpanLinksExtractor(
@ -101,77 +128,68 @@ final class AwsSdkInstrumenterFactory {
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
} }
private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors( private static <RESPONSE>
List<AttributesExtractor<Request<?>, Response<?>>> extractors) { List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> toSqsRequestExtractors(
List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>(); List<AttributesExtractor<Request<?>, Response<?>>> extractors,
Function<RESPONSE, Response<?>> responseConverter) {
List<AttributesExtractor<AbstractSqsRequest, RESPONSE>> result = new ArrayList<>();
for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) { for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) {
result.add( result.add(
new AttributesExtractor<SqsProcessRequest, Void>() { new AttributesExtractor<AbstractSqsRequest, RESPONSE>() {
@Override @Override
public void onStart( public void onStart(
AttributesBuilder attributes, AttributesBuilder attributes,
Context parentContext, Context parentContext,
SqsProcessRequest sqsProcessRequest) { AbstractSqsRequest sqsRequest) {
extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest()); extractor.onStart(attributes, parentContext, sqsRequest.getRequest());
} }
@Override @Override
public void onEnd( public void onEnd(
AttributesBuilder attributes, AttributesBuilder attributes,
Context context, Context context,
SqsProcessRequest sqsProcessRequest, AbstractSqsRequest sqsRequest,
@Nullable Void unused, @Nullable RESPONSE response,
@Nullable Throwable error) { @Nullable Throwable error) {
extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error); extractor.onEnd(
attributes,
context,
sqsRequest.getRequest(),
responseConverter.apply(response),
error);
} }
}); });
} }
return result; return result;
} }
static Instrumenter<Request<?>, Response<?>> producerInstrumenter( Instrumenter<Request<?>, Response<?>> producerInstrumenter() {
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { MessageOperation operation = MessageOperation.PUBLISH;
return sqsInstrumenter(
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes, true);
}
private static Instrumenter<Request<?>, Response<?>> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
boolean captureExperimentalSpanAttributes,
boolean enabled) {
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE; SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor = AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build(); messagingAttributesExtractor(getter, operation);
return createInstrumenter( return createInstrumenter(
openTelemetry, openTelemetry,
captureExperimentalSpanAttributes,
MessagingSpanNameExtractor.create(getter, operation), MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH SpanKindExtractor.alwaysProducer(),
? SpanKindExtractor.alwaysProducer() attributesExtractors(),
: SpanKindExtractor.alwaysConsumer(),
singletonList(messagingAttributeExtractor), singletonList(messagingAttributeExtractor),
enabled); true);
} }
private static Instrumenter<Request<?>, Response<?>> createInstrumenter( private static <REQUEST, RESPONSE> Instrumenter<REQUEST, RESPONSE> createInstrumenter(
OpenTelemetry openTelemetry, OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes, SpanNameExtractor<REQUEST> spanNameExtractor,
SpanNameExtractor<Request<?>> spanNameExtractor, SpanKindExtractor<REQUEST> spanKindExtractor,
SpanKindExtractor<Request<?>> spanKindExtractor, List<? extends AttributesExtractor<? super REQUEST, ? super RESPONSE>> attributeExtractors,
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors, List<AttributesExtractor<REQUEST, RESPONSE>> additionalAttributeExtractors,
boolean enabled) { boolean enabled) {
return Instrumenter.<Request<?>, Response<?>>builder( return Instrumenter.<REQUEST, RESPONSE>builder(
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor) openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors( .addAttributesExtractors(attributeExtractors)
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors)
.addAttributesExtractors(additionalAttributeExtractors) .addAttributesExtractors(additionalAttributeExtractors)
.setEnabled(enabled) .setEnabled(enabled)
.buildInstrumenter(spanKindExtractor); .buildInstrumenter(spanKindExtractor);
} }
private AwsSdkInstrumenterFactory() {}
} }

View File

@ -11,6 +11,7 @@ import com.amazonaws.handlers.RequestHandler2;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;
/** /**
* Entrypoint for instrumenting AWS SDK v1 clients. * Entrypoint for instrumenting AWS SDK v1 clients.
@ -45,30 +46,25 @@ public class AwsSdkTelemetry {
} }
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter; private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter; private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter; private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter; private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
AwsSdkTelemetry( AwsSdkTelemetry(
OpenTelemetry openTelemetry, OpenTelemetry openTelemetry,
List<String> capturedHeaders,
boolean captureExperimentalSpanAttributes, boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) { boolean messagingReceiveInstrumentationEnabled) {
requestInstrumenter = AwsSdkInstrumenterFactory instrumenterFactory =
AwsSdkInstrumenterFactory.requestInstrumenter( new AwsSdkInstrumenterFactory(
openTelemetry, captureExperimentalSpanAttributes);
consumerReceiveInstrumenter =
AwsSdkInstrumenterFactory.consumerReceiveInstrumenter(
openTelemetry, openTelemetry,
capturedHeaders,
captureExperimentalSpanAttributes, captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled); messagingReceiveInstrumentationEnabled);
consumerProcessInstrumenter = requestInstrumenter = instrumenterFactory.requestInstrumenter();
AwsSdkInstrumenterFactory.consumerProcessInstrumenter( consumerReceiveInstrumenter = instrumenterFactory.consumerReceiveInstrumenter();
openTelemetry, consumerProcessInstrumenter = instrumenterFactory.consumerProcessInstrumenter();
captureExperimentalSpanAttributes, producerInstrumenter = instrumenterFactory.producerInstrumenter();
messagingReceiveInstrumentationEnabled);
producerInstrumenter =
AwsSdkInstrumenterFactory.producerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
} }
/** /**

View File

@ -5,14 +5,18 @@
package io.opentelemetry.instrumentation.awssdk.v1_11; package io.opentelemetry.instrumentation.awssdk.v1_11;
import static java.util.Collections.emptyList;
import com.google.errorprone.annotations.CanIgnoreReturnValue; import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.OpenTelemetry;
import java.util.List;
/** A builder of {@link AwsSdkTelemetry}. */ /** A builder of {@link AwsSdkTelemetry}. */
public class AwsSdkTelemetryBuilder { public class AwsSdkTelemetryBuilder {
private final OpenTelemetry openTelemetry; private final OpenTelemetry openTelemetry;
private List<String> capturedHeaders = emptyList();
private boolean captureExperimentalSpanAttributes; private boolean captureExperimentalSpanAttributes;
private boolean messagingReceiveInstrumentationEnabled; private boolean messagingReceiveInstrumentationEnabled;
@ -20,6 +24,17 @@ public class AwsSdkTelemetryBuilder {
this.openTelemetry = openTelemetry; this.openTelemetry = openTelemetry;
} }
/**
* Configures the messaging headers that will be captured as span attributes.
*
* @param capturedHeaders A list of messaging header names.
*/
@CanIgnoreReturnValue
public AwsSdkTelemetryBuilder setCapturedHeaders(List<String> capturedHeaders) {
this.capturedHeaders = capturedHeaders;
return this;
}
/** /**
* Sets whether experimental attributes should be set to spans. These attributes may be changed or * Sets whether experimental attributes should be set to spans. These attributes may be changed or
* removed in the future, so only enable this if you know you do not require attributes filled by * removed in the future, so only enable this if you know you do not require attributes filled by
@ -50,6 +65,9 @@ public class AwsSdkTelemetryBuilder {
*/ */
public AwsSdkTelemetry build() { public AwsSdkTelemetry build() {
return new AwsSdkTelemetry( return new AwsSdkTelemetry(
openTelemetry, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled); openTelemetry,
capturedHeaders,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
} }
} }

View File

@ -11,8 +11,6 @@ import com.amazonaws.Response;
import io.opentelemetry.context.Context; import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.util.Collections;
import java.util.Map;
final class SqsAccess { final class SqsAccess {
private SqsAccess() {} private SqsAccess() {}
@ -36,7 +34,7 @@ final class SqsAccess {
} }
@NoMuzzle @NoMuzzle
static Map<String, String> getMessageAttributes(Request<?> request) { static String getMessageAttribute(Request<?> request, String name) {
return enabled ? SqsImpl.getMessageAttributes(request) : Collections.emptyMap(); return enabled ? SqsImpl.getMessageAttribute(request, name) : null;
} }
} }

View File

@ -59,7 +59,7 @@ enum SqsAttributesGetter implements MessagingAttributesGetter<Request<?>, Respon
@Override @Override
public List<String> getMessageHeader(Request<?> request, String name) { public List<String> getMessageHeader(Request<?> request, String name) {
String value = SqsAccess.getMessageAttributes(request).get(name); String value = SqsAccess.getMessageAttribute(request, name);
return value != null ? Collections.singletonList(value) : Collections.emptyList(); return value != null ? Collections.singletonList(value) : Collections.emptyList();
} }
} }

View File

@ -18,8 +18,6 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.instrumentation.api.internal.Timer;
import java.lang.reflect.Field; import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
final class SqsImpl { final class SqsImpl {
@ -56,18 +54,20 @@ final class SqsImpl {
return; return;
} }
Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter = Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter =
requestHandler.getConsumerReceiveInstrumenter(); requestHandler.getConsumerReceiveInstrumenter();
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter = Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter =
requestHandler.getConsumerProcessInstrumenter(); requestHandler.getConsumerProcessInstrumenter();
Context receiveContext = null; Context receiveContext = null;
if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, request)) { SqsReceiveRequest receiveRequest =
SqsReceiveRequest.create(request, SqsMessageImpl.wrap(receiveMessageResult.getMessages()));
if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, receiveRequest)) {
receiveContext = receiveContext =
InstrumenterUtil.startAndEnd( InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter, consumerReceiveInstrumenter,
parentContext, parentContext,
request, receiveRequest,
response, response,
null, null,
timer.startTime(), timer.startTime(),
@ -123,18 +123,15 @@ final class SqsImpl {
return false; return false;
} }
static Map<String, String> getMessageAttributes(Request<?> request) { static String getMessageAttribute(Request<?> request, String name) {
if (request instanceof SendMessageRequest) { if (request.getOriginalRequest() instanceof SendMessageRequest) {
Map<String, MessageAttributeValue> map = Map<String, MessageAttributeValue> map =
((SendMessageRequest) request).getMessageAttributes(); ((SendMessageRequest) request.getOriginalRequest()).getMessageAttributes();
if (!map.isEmpty()) { MessageAttributeValue value = map.get(name);
Map<String, String> result = new HashMap<>(); if (value != null) {
for (Map.Entry<String, MessageAttributeValue> entry : map.entrySet()) { return value.getStringValue();
result.put(entry.getKey(), entry.getValue().getStringValue());
}
return result;
} }
} }
return Collections.emptyMap(); return null;
} }
} }

View File

@ -14,4 +14,6 @@ import java.util.Map;
interface SqsMessage { interface SqsMessage {
Map<String, String> getAttributes(); Map<String, String> getAttributes();
String getMessageAttribute(String name);
} }

View File

@ -6,6 +6,9 @@
package io.opentelemetry.instrumentation.awssdk.v1_11; package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.services.sqs.model.Message; import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import java.util.ArrayList;
import java.util.List;
import java.util.Map; import java.util.Map;
final class SqsMessageImpl implements SqsMessage { final class SqsMessageImpl implements SqsMessage {
@ -20,8 +23,22 @@ final class SqsMessageImpl implements SqsMessage {
return new SqsMessageImpl(message); return new SqsMessageImpl(message);
} }
static List<SqsMessage> wrap(List<Message> messages) {
List<SqsMessage> result = new ArrayList<>();
for (Message message : messages) {
result.add(wrap(message));
}
return result;
}
@Override @Override
public Map<String, String> getAttributes() { public Map<String, String> getAttributes() {
return message.getAttributes(); return message.getAttributes();
} }
@Override
public String getMessageAttribute(String name) {
MessageAttributeValue value = message.getMessageAttributes().get(name);
return value != null ? value.getStringValue() : null;
}
} }

View File

@ -7,7 +7,7 @@ package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request; import com.amazonaws.Request;
final class SqsProcessRequest { final class SqsProcessRequest extends AbstractSqsRequest {
private final Request<?> request; private final Request<?> request;
private final SqsMessage message; private final SqsMessage message;
@ -20,6 +20,7 @@ final class SqsProcessRequest {
return new SqsProcessRequest(request, message); return new SqsProcessRequest(request, message);
} }
@Override
public Request<?> getRequest() { public Request<?> getRequest() {
return request; return request;
} }

View File

@ -58,7 +58,7 @@ enum SqsProcessRequestAttributesGetter
@Override @Override
public List<String> getMessageHeader(SqsProcessRequest request, String name) { public List<String> getMessageHeader(SqsProcessRequest request, String name) {
String value = SqsAccess.getMessageAttributes(request.getRequest()).get(name); String value = request.getMessage().getMessageAttribute(name);
return value != null ? Collections.singletonList(value) : Collections.emptyList(); return value != null ? Collections.singletonList(value) : Collections.emptyList();
} }
} }

View File

@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
import java.util.List;
final class SqsReceiveRequest extends AbstractSqsRequest {
private final Request<?> request;
private final List<SqsMessage> messages;
private SqsReceiveRequest(Request<?> request, List<SqsMessage> messages) {
this.request = request;
this.messages = messages;
}
public static SqsReceiveRequest create(Request<?> request, List<SqsMessage> messages) {
return new SqsReceiveRequest(request, messages);
}
@Override
public Request<?> getRequest() {
return request;
}
public List<SqsMessage> getMessages() {
return messages;
}
}

View File

@ -0,0 +1,68 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
enum SqsReceiveRequestAttributesGetter
implements MessagingAttributesGetter<SqsReceiveRequest, Response<?>> {
INSTANCE;
@Override
public String getSystem(SqsReceiveRequest request) {
return "AmazonSQS";
}
@Override
public String getDestination(SqsReceiveRequest request) {
Object originalRequest = request.getRequest().getOriginalRequest();
String queueUrl = RequestAccess.getQueueUrl(originalRequest);
int i = queueUrl.lastIndexOf('/');
return i > 0 ? queueUrl.substring(i + 1) : null;
}
@Override
public boolean isTemporaryDestination(SqsReceiveRequest request) {
return false;
}
@Override
@Nullable
public String getConversationId(SqsReceiveRequest request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadSize(SqsReceiveRequest request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadCompressedSize(SqsReceiveRequest request) {
return null;
}
@Override
@Nullable
public String getMessageId(SqsReceiveRequest request, @Nullable Response<?> response) {
return null;
}
@Override
public List<String> getMessageHeader(SqsReceiveRequest request, String name) {
return StreamSupport.stream(request.getMessages().spliterator(), false)
.map(message -> message.getMessageAttribute(name))
.filter(value -> value != null)
.collect(Collectors.toList());
}
}

View File

@ -33,13 +33,13 @@ final class TracingRequestHandler extends RequestHandler2 {
ContextKey.named(TracingRequestHandler.class.getName() + ".RequestSpanSuppressed"); ContextKey.named(TracingRequestHandler.class.getName() + ".RequestSpanSuppressed");
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter; private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter; private final Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter; private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter; private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
TracingRequestHandler( TracingRequestHandler(
Instrumenter<Request<?>, Response<?>> requestInstrumenter, Instrumenter<Request<?>, Response<?>> requestInstrumenter,
Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter, Instrumenter<SqsReceiveRequest, Response<?>> consumerReceiveInstrumenter,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter, Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Instrumenter<Request<?>, Response<?>> producerInstrumenter) { Instrumenter<Request<?>, Response<?>> producerInstrumenter) {
this.requestInstrumenter = requestInstrumenter; this.requestInstrumenter = requestInstrumenter;
@ -99,7 +99,7 @@ final class TracingRequestHandler extends RequestHandler2 {
return request; return request;
} }
Instrumenter<Request<?>, Response<?>> getConsumerReceiveInstrumenter() { Instrumenter<SqsReceiveRequest, Response<?>> getConsumerReceiveInstrumenter() {
return consumerReceiveInstrumenter; return consumerReceiveInstrumenter;
} }

View File

@ -8,6 +8,8 @@ package io.opentelemetry.instrumentation.awssdk.v1_11
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.test.LibraryTestTrait import io.opentelemetry.instrumentation.test.LibraryTestTrait
import static java.util.Collections.singletonList
class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait { class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait {
@Override @Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) { AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
@ -15,6 +17,7 @@ class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait
AwsSdkTelemetry.builder(getOpenTelemetry()) AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true) .setCaptureExperimentalSpanAttributes(true)
.setMessagingReceiveInstrumentationEnabled(true) .setMessagingReceiveInstrumentationEnabled(true)
.setCapturedHeaders(singletonList("test-message-header"))
.build() .build()
.newRequestHandler()) .newRequestHandler())
} }

View File

@ -10,6 +10,7 @@ import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sqs.AmazonSQSAsyncClient import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.MessageAttributeValue
import com.amazonaws.services.sqs.model.ReceiveMessageRequest import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.instrumentation.test.InstrumentationSpecification
@ -51,14 +52,21 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
} }
} }
def "simple sqs producer-consumer services"() { def "simple sqs producer-consumer services #testCaptureHeaders"() {
setup: setup:
client.createQueue("testSdkSqs") client.createQueue("testSdkSqs")
when: when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") SendMessageRequest sendMessageRequest = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send) if (testCaptureHeaders) {
def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") sendMessageRequest.addMessageAttributesEntry("test-message-header", new MessageAttributeValue().withDataType("String").withStringValue("test"))
}
client.sendMessage(sendMessageRequest)
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs")
if (testCaptureHeaders) {
receiveMessageRequest.withMessageAttributeNames("test-message-header")
}
def receiveMessageResult = client.receiveMessage(receiveMessageRequest)
receiveMessageResult.messages.each {message -> receiveMessageResult.messages.each {message ->
runWithSpan("process child") {} runWithSpan("process child") {}
} }
@ -113,6 +121,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
} }
} }
publishSpan = span(0) publishSpan = span(0)
@ -140,6 +151,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
} }
} }
span(1) { span(1) {
@ -161,6 +175,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process" "$SemanticAttributes.MESSAGING_OPERATION" "process"
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
} }
} }
span(2) { span(2) {
@ -171,6 +188,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
} }
} }
} }
where:
testCaptureHeaders << [false, true]
} }
def "simple sqs producer-consumer services with parent span"() { def "simple sqs producer-consumer services with parent span"() {