diff --git a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ConfigPropertiesUtil.java b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ConfigPropertiesUtil.java index 5c315810ff..aa102108af 100644 --- a/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ConfigPropertiesUtil.java +++ b/instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/internal/ConfigPropertiesUtil.java @@ -5,7 +5,10 @@ package io.opentelemetry.instrumentation.api.internal; +import java.util.Arrays; +import java.util.List; import java.util.Locale; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -40,6 +43,21 @@ public final class ConfigPropertiesUtil { return System.getenv(toEnvVarName(propertyName)); } + public static List getList(String propertyName, List defaultValue) { + String value = getString(propertyName); + if (value == null) { + return defaultValue; + } + return filterBlanksAndNulls(value.split(",")); + } + + private static List filterBlanksAndNulls(String[] values) { + return Arrays.stream(values) + .map(String::trim) + .filter(s -> !s.isEmpty()) + .collect(Collectors.toList()); + } + private static String toEnvVarName(String propertyName) { return propertyName.toUpperCase(Locale.ROOT).replace('-', '_').replace('.', '_'); } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java index 8891b90721..dee44d73f0 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/awssdk/v1_11/TracingRequestHandler.java @@ -40,6 +40,7 @@ public class TracingRequestHandler extends RequestHandler2 { .getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false)) .setMessagingReceiveInstrumentationEnabled( ExperimentalConfig.get().messagingReceiveInstrumentationEnabled()) + .setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders()) .build() .newRequestHandler(); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts index 6482abfc66..f12de2b7f8 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/build.gradle.kts @@ -26,6 +26,7 @@ dependencies { tasks { withType().configureEach { systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true") + systemProperty("otel.instrumentation.messaging.experimental.capture-headers", "test-message-header") } val testReceiveSpansDisabled by registering(Test::class) { diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java index 904fefe562..2d92c041c0 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library-autoconfigure/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/autoconfigure/TracingRequestHandler.java @@ -5,6 +5,8 @@ package io.opentelemetry.instrumentation.awssdk.v1_11.autoconfigure; +import static java.util.Collections.emptyList; + import com.amazonaws.AmazonWebServiceRequest; import com.amazonaws.Request; import com.amazonaws.Response; @@ -26,6 +28,9 @@ public class TracingRequestHandler extends RequestHandler2 { .setMessagingReceiveInstrumentationEnabled( ConfigPropertiesUtil.getBoolean( "otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false)) + .setCapturedHeaders( + ConfigPropertiesUtil.getList( + "otel.instrumentation.messaging.experimental.capture-headers", emptyList())) .build() .newRequestHandler(); diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsRequest.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsRequest.java new file mode 100644 index 0000000000..b96720f66b --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsRequest.java @@ -0,0 +1,13 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.Request; + +abstract class AbstractSqsRequest { + + public abstract Request getRequest(); +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java index bf12e58a77..93a26064db 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkInstrumenterFactory.java @@ -22,11 +22,13 @@ import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor; import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.function.Function; import javax.annotation.Nullable; final class AwsSdkInstrumenterFactory { @@ -47,48 +49,73 @@ final class AwsSdkInstrumenterFactory { httpAttributesExtractor, rpcAttributesExtractor, experimentalAttributesExtractor); private static final AwsSdkSpanNameExtractor spanName = new AwsSdkSpanNameExtractor(); - static Instrumenter, Response> requestInstrumenter( - OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { + private final OpenTelemetry openTelemetry; + private final List capturedHeaders; + private final boolean captureExperimentalSpanAttributes; + private final boolean messagingReceiveInstrumentationEnabled; + AwsSdkInstrumenterFactory( + OpenTelemetry openTelemetry, + List capturedHeaders, + boolean captureExperimentalSpanAttributes, + boolean messagingReceiveInstrumentationEnabled) { + this.openTelemetry = openTelemetry; + this.capturedHeaders = capturedHeaders; + this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; + this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled; + } + + Instrumenter, Response> requestInstrumenter() { return createInstrumenter( openTelemetry, - captureExperimentalSpanAttributes, spanName, SpanKindExtractor.alwaysClient(), + attributesExtractors(), emptyList(), true); } - static Instrumenter, Response> consumerReceiveInstrumenter( - OpenTelemetry openTelemetry, - boolean captureExperimentalSpanAttributes, - boolean messagingReceiveInstrumentationEnabled) { - return sqsInstrumenter( + private List, Response>> attributesExtractors() { + return captureExperimentalSpanAttributes + ? extendedAttributesExtractors + : defaultAttributesExtractors; + } + + private AttributesExtractor messagingAttributesExtractor( + MessagingAttributesGetter getter, MessageOperation operation) { + return MessagingAttributesExtractor.builder(getter, operation) + .setCapturedHeaders(capturedHeaders) + .build(); + } + + Instrumenter> consumerReceiveInstrumenter() { + MessageOperation operation = MessageOperation.RECEIVE; + SqsReceiveRequestAttributesGetter getter = SqsReceiveRequestAttributesGetter.INSTANCE; + AttributesExtractor> messagingAttributeExtractor = + messagingAttributesExtractor(getter, operation); + + return createInstrumenter( openTelemetry, - MessageOperation.RECEIVE, - captureExperimentalSpanAttributes, + MessagingSpanNameExtractor.create(getter, operation), + SpanKindExtractor.alwaysConsumer(), + toSqsRequestExtractors(attributesExtractors(), Function.identity()), + singletonList(messagingAttributeExtractor), messagingReceiveInstrumentationEnabled); } - static Instrumenter consumerProcessInstrumenter( - OpenTelemetry openTelemetry, - boolean captureExperimentalSpanAttributes, - boolean messagingReceiveInstrumentationEnabled) { + Instrumenter consumerProcessInstrumenter() { MessageOperation operation = MessageOperation.PROCESS; SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE; + AttributesExtractor messagingAttributeExtractor = + messagingAttributesExtractor(getter, operation); InstrumenterBuilder builder = Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, MessagingSpanNameExtractor.create(getter, operation)) - .addAttributesExtractors( - toProcessRequestExtractors( - captureExperimentalSpanAttributes - ? extendedAttributesExtractors - : defaultAttributesExtractors)) - .addAttributesExtractor( - MessagingAttributesExtractor.builder(getter, operation).build()); + .addAttributesExtractors(toSqsRequestExtractors(attributesExtractors(), unused -> null)) + .addAttributesExtractor(messagingAttributeExtractor); if (messagingReceiveInstrumentationEnabled) { builder.addSpanLinksExtractor( @@ -101,77 +128,68 @@ final class AwsSdkInstrumenterFactory { return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer()); } - private static List> toProcessRequestExtractors( - List, Response>> extractors) { - List> result = new ArrayList<>(); + private static + List> toSqsRequestExtractors( + List, Response>> extractors, + Function> responseConverter) { + List> result = new ArrayList<>(); for (AttributesExtractor, Response> extractor : extractors) { result.add( - new AttributesExtractor() { + new AttributesExtractor() { @Override public void onStart( AttributesBuilder attributes, Context parentContext, - SqsProcessRequest sqsProcessRequest) { - extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest()); + AbstractSqsRequest sqsRequest) { + extractor.onStart(attributes, parentContext, sqsRequest.getRequest()); } @Override public void onEnd( AttributesBuilder attributes, Context context, - SqsProcessRequest sqsProcessRequest, - @Nullable Void unused, + AbstractSqsRequest sqsRequest, + @Nullable RESPONSE response, @Nullable Throwable error) { - extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error); + extractor.onEnd( + attributes, + context, + sqsRequest.getRequest(), + responseConverter.apply(response), + error); } }); } return result; } - static Instrumenter, Response> producerInstrumenter( - OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { - return sqsInstrumenter( - openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes, true); - } - - private static Instrumenter, Response> sqsInstrumenter( - OpenTelemetry openTelemetry, - MessageOperation operation, - boolean captureExperimentalSpanAttributes, - boolean enabled) { + Instrumenter, Response> producerInstrumenter() { + MessageOperation operation = MessageOperation.PUBLISH; SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE; AttributesExtractor, Response> messagingAttributeExtractor = - MessagingAttributesExtractor.builder(getter, operation).build(); + messagingAttributesExtractor(getter, operation); return createInstrumenter( openTelemetry, - captureExperimentalSpanAttributes, MessagingSpanNameExtractor.create(getter, operation), - operation == MessageOperation.PUBLISH - ? SpanKindExtractor.alwaysProducer() - : SpanKindExtractor.alwaysConsumer(), + SpanKindExtractor.alwaysProducer(), + attributesExtractors(), singletonList(messagingAttributeExtractor), - enabled); + true); } - private static Instrumenter, Response> createInstrumenter( + private static Instrumenter createInstrumenter( OpenTelemetry openTelemetry, - boolean captureExperimentalSpanAttributes, - SpanNameExtractor> spanNameExtractor, - SpanKindExtractor> spanKindExtractor, - List, Response>> additionalAttributeExtractors, + SpanNameExtractor spanNameExtractor, + SpanKindExtractor spanKindExtractor, + List> attributeExtractors, + List> additionalAttributeExtractors, boolean enabled) { - return Instrumenter., Response>builder( + return Instrumenter.builder( openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor) - .addAttributesExtractors( - captureExperimentalSpanAttributes - ? extendedAttributesExtractors - : defaultAttributesExtractors) + .addAttributesExtractors(attributeExtractors) .addAttributesExtractors(additionalAttributeExtractors) .setEnabled(enabled) .buildInstrumenter(spanKindExtractor); } - - private AwsSdkInstrumenterFactory() {} } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java index b41ad3147f..8cf95e73a9 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetry.java @@ -11,6 +11,7 @@ import com.amazonaws.handlers.RequestHandler2; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.List; /** * Entrypoint for instrumenting AWS SDK v1 clients. @@ -45,30 +46,25 @@ public class AwsSdkTelemetry { } private final Instrumenter, Response> requestInstrumenter; - private final Instrumenter, Response> consumerReceiveInstrumenter; + private final Instrumenter> consumerReceiveInstrumenter; private final Instrumenter consumerProcessInstrumenter; private final Instrumenter, Response> producerInstrumenter; AwsSdkTelemetry( OpenTelemetry openTelemetry, + List capturedHeaders, boolean captureExperimentalSpanAttributes, boolean messagingReceiveInstrumentationEnabled) { - requestInstrumenter = - AwsSdkInstrumenterFactory.requestInstrumenter( - openTelemetry, captureExperimentalSpanAttributes); - consumerReceiveInstrumenter = - AwsSdkInstrumenterFactory.consumerReceiveInstrumenter( + AwsSdkInstrumenterFactory instrumenterFactory = + new AwsSdkInstrumenterFactory( openTelemetry, + capturedHeaders, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled); - consumerProcessInstrumenter = - AwsSdkInstrumenterFactory.consumerProcessInstrumenter( - openTelemetry, - captureExperimentalSpanAttributes, - messagingReceiveInstrumentationEnabled); - producerInstrumenter = - AwsSdkInstrumenterFactory.producerInstrumenter( - openTelemetry, captureExperimentalSpanAttributes); + requestInstrumenter = instrumenterFactory.requestInstrumenter(); + consumerReceiveInstrumenter = instrumenterFactory.consumerReceiveInstrumenter(); + consumerProcessInstrumenter = instrumenterFactory.consumerProcessInstrumenter(); + producerInstrumenter = instrumenterFactory.producerInstrumenter(); } /** diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java index daadf8e97d..b36104f531 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/AwsSdkTelemetryBuilder.java @@ -5,14 +5,18 @@ package io.opentelemetry.instrumentation.awssdk.v1_11; +import static java.util.Collections.emptyList; + import com.google.errorprone.annotations.CanIgnoreReturnValue; import io.opentelemetry.api.OpenTelemetry; +import java.util.List; /** A builder of {@link AwsSdkTelemetry}. */ public class AwsSdkTelemetryBuilder { private final OpenTelemetry openTelemetry; + private List capturedHeaders = emptyList(); private boolean captureExperimentalSpanAttributes; private boolean messagingReceiveInstrumentationEnabled; @@ -20,6 +24,17 @@ public class AwsSdkTelemetryBuilder { this.openTelemetry = openTelemetry; } + /** + * Configures the messaging headers that will be captured as span attributes. + * + * @param capturedHeaders A list of messaging header names. + */ + @CanIgnoreReturnValue + public AwsSdkTelemetryBuilder setCapturedHeaders(List capturedHeaders) { + this.capturedHeaders = capturedHeaders; + return this; + } + /** * Sets whether experimental attributes should be set to spans. These attributes may be changed or * removed in the future, so only enable this if you know you do not require attributes filled by @@ -50,6 +65,9 @@ public class AwsSdkTelemetryBuilder { */ public AwsSdkTelemetry build() { return new AwsSdkTelemetry( - openTelemetry, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled); + openTelemetry, + capturedHeaders, + captureExperimentalSpanAttributes, + messagingReceiveInstrumentationEnabled); } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java index 0bfef4012a..c077dbd8c6 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAccess.java @@ -11,8 +11,6 @@ import com.amazonaws.Response; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.internal.Timer; import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle; -import java.util.Collections; -import java.util.Map; final class SqsAccess { private SqsAccess() {} @@ -36,7 +34,7 @@ final class SqsAccess { } @NoMuzzle - static Map getMessageAttributes(Request request) { - return enabled ? SqsImpl.getMessageAttributes(request) : Collections.emptyMap(); + static String getMessageAttribute(Request request, String name) { + return enabled ? SqsImpl.getMessageAttribute(request, name) : null; } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAttributesGetter.java index 9aff54a412..012fa8d0e7 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAttributesGetter.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsAttributesGetter.java @@ -59,7 +59,7 @@ enum SqsAttributesGetter implements MessagingAttributesGetter, Respon @Override public List getMessageHeader(Request request, String name) { - String value = SqsAccess.getMessageAttributes(request).get(name); + String value = SqsAccess.getMessageAttribute(request, name); return value != null ? Collections.singletonList(value) : Collections.emptyList(); } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java index e0b4b93ee4..fdecfbe215 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsImpl.java @@ -18,8 +18,6 @@ import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil; import io.opentelemetry.instrumentation.api.internal.Timer; import java.lang.reflect.Field; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; final class SqsImpl { @@ -56,18 +54,20 @@ final class SqsImpl { return; } - Instrumenter, Response> consumerReceiveInstrumenter = + Instrumenter> consumerReceiveInstrumenter = requestHandler.getConsumerReceiveInstrumenter(); Instrumenter consumerProcessInstrumenter = requestHandler.getConsumerProcessInstrumenter(); Context receiveContext = null; - if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, request)) { + SqsReceiveRequest receiveRequest = + SqsReceiveRequest.create(request, SqsMessageImpl.wrap(receiveMessageResult.getMessages())); + if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, receiveRequest)) { receiveContext = InstrumenterUtil.startAndEnd( consumerReceiveInstrumenter, parentContext, - request, + receiveRequest, response, null, timer.startTime(), @@ -123,18 +123,15 @@ final class SqsImpl { return false; } - static Map getMessageAttributes(Request request) { - if (request instanceof SendMessageRequest) { + static String getMessageAttribute(Request request, String name) { + if (request.getOriginalRequest() instanceof SendMessageRequest) { Map map = - ((SendMessageRequest) request).getMessageAttributes(); - if (!map.isEmpty()) { - Map result = new HashMap<>(); - for (Map.Entry entry : map.entrySet()) { - result.put(entry.getKey(), entry.getValue().getStringValue()); - } - return result; + ((SendMessageRequest) request.getOriginalRequest()).getMessageAttributes(); + MessageAttributeValue value = map.get(name); + if (value != null) { + return value.getStringValue(); } } - return Collections.emptyMap(); + return null; } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java index 9083f7293d..8da1924f88 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessage.java @@ -14,4 +14,6 @@ import java.util.Map; interface SqsMessage { Map getAttributes(); + + String getMessageAttribute(String name); } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java index 83ef26fc09..5b9c993f5b 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsMessageImpl.java @@ -6,6 +6,9 @@ package io.opentelemetry.instrumentation.awssdk.v1_11; import com.amazonaws.services.sqs.model.Message; +import com.amazonaws.services.sqs.model.MessageAttributeValue; +import java.util.ArrayList; +import java.util.List; import java.util.Map; final class SqsMessageImpl implements SqsMessage { @@ -20,8 +23,22 @@ final class SqsMessageImpl implements SqsMessage { return new SqsMessageImpl(message); } + static List wrap(List messages) { + List result = new ArrayList<>(); + for (Message message : messages) { + result.add(wrap(message)); + } + return result; + } + @Override public Map getAttributes() { return message.getAttributes(); } + + @Override + public String getMessageAttribute(String name) { + MessageAttributeValue value = message.getMessageAttributes().get(name); + return value != null ? value.getStringValue() : null; + } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java index f537c9cf3b..9a13347f98 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequest.java @@ -7,7 +7,7 @@ package io.opentelemetry.instrumentation.awssdk.v1_11; import com.amazonaws.Request; -final class SqsProcessRequest { +final class SqsProcessRequest extends AbstractSqsRequest { private final Request request; private final SqsMessage message; @@ -20,6 +20,7 @@ final class SqsProcessRequest { return new SqsProcessRequest(request, message); } + @Override public Request getRequest() { return request; } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java index 564110e6a2..24bfc3c345 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsProcessRequestAttributesGetter.java @@ -58,7 +58,7 @@ enum SqsProcessRequestAttributesGetter @Override public List getMessageHeader(SqsProcessRequest request, String name) { - String value = SqsAccess.getMessageAttributes(request.getRequest()).get(name); + String value = request.getMessage().getMessageAttribute(name); return value != null ? Collections.singletonList(value) : Collections.emptyList(); } } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsReceiveRequest.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsReceiveRequest.java new file mode 100644 index 0000000000..947f756243 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsReceiveRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.Request; +import java.util.List; + +final class SqsReceiveRequest extends AbstractSqsRequest { + private final Request request; + private final List messages; + + private SqsReceiveRequest(Request request, List messages) { + this.request = request; + this.messages = messages; + } + + public static SqsReceiveRequest create(Request request, List messages) { + return new SqsReceiveRequest(request, messages); + } + + @Override + public Request getRequest() { + return request; + } + + public List getMessages() { + return messages; + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsReceiveRequestAttributesGetter.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsReceiveRequestAttributesGetter.java new file mode 100644 index 0000000000..6247789672 --- /dev/null +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/SqsReceiveRequestAttributesGetter.java @@ -0,0 +1,68 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.awssdk.v1_11; + +import com.amazonaws.Response; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; + +enum SqsReceiveRequestAttributesGetter + implements MessagingAttributesGetter> { + INSTANCE; + + @Override + public String getSystem(SqsReceiveRequest request) { + return "AmazonSQS"; + } + + @Override + public String getDestination(SqsReceiveRequest request) { + Object originalRequest = request.getRequest().getOriginalRequest(); + String queueUrl = RequestAccess.getQueueUrl(originalRequest); + int i = queueUrl.lastIndexOf('/'); + return i > 0 ? queueUrl.substring(i + 1) : null; + } + + @Override + public boolean isTemporaryDestination(SqsReceiveRequest request) { + return false; + } + + @Override + @Nullable + public String getConversationId(SqsReceiveRequest request) { + return null; + } + + @Override + @Nullable + public Long getMessagePayloadSize(SqsReceiveRequest request) { + return null; + } + + @Override + @Nullable + public Long getMessagePayloadCompressedSize(SqsReceiveRequest request) { + return null; + } + + @Override + @Nullable + public String getMessageId(SqsReceiveRequest request, @Nullable Response response) { + return null; + } + + @Override + public List getMessageHeader(SqsReceiveRequest request, String name) { + return StreamSupport.stream(request.getMessages().spliterator(), false) + .map(message -> message.getMessageAttribute(name)) + .filter(value -> value != null) + .collect(Collectors.toList()); + } +} diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java index edf2747181..6a43b5ff51 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/main/java/io/opentelemetry/instrumentation/awssdk/v1_11/TracingRequestHandler.java @@ -33,13 +33,13 @@ final class TracingRequestHandler extends RequestHandler2 { ContextKey.named(TracingRequestHandler.class.getName() + ".RequestSpanSuppressed"); private final Instrumenter, Response> requestInstrumenter; - private final Instrumenter, Response> consumerReceiveInstrumenter; + private final Instrumenter> consumerReceiveInstrumenter; private final Instrumenter consumerProcessInstrumenter; private final Instrumenter, Response> producerInstrumenter; TracingRequestHandler( Instrumenter, Response> requestInstrumenter, - Instrumenter, Response> consumerReceiveInstrumenter, + Instrumenter> consumerReceiveInstrumenter, Instrumenter consumerProcessInstrumenter, Instrumenter, Response> producerInstrumenter) { this.requestInstrumenter = requestInstrumenter; @@ -99,7 +99,7 @@ final class TracingRequestHandler extends RequestHandler2 { return request; } - Instrumenter, Response> getConsumerReceiveInstrumenter() { + Instrumenter> getConsumerReceiveInstrumenter() { return consumerReceiveInstrumenter; } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy index d069411b15..9862cf4967 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/library/src/test/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/SqsTracingTest.groovy @@ -8,6 +8,8 @@ package io.opentelemetry.instrumentation.awssdk.v1_11 import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder import io.opentelemetry.instrumentation.test.LibraryTestTrait +import static java.util.Collections.singletonList + class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait { @Override AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) { @@ -15,6 +17,7 @@ class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait AwsSdkTelemetry.builder(getOpenTelemetry()) .setCaptureExperimentalSpanAttributes(true) .setMessagingReceiveInstrumentationEnabled(true) + .setCapturedHeaders(singletonList("test-message-header")) .build() .newRequestHandler()) } diff --git a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy index da70dd1374..939b470b77 100644 --- a/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy +++ b/instrumentation/aws-sdk/aws-sdk-1.11/testing/src/main/groovy/io/opentelemetry/instrumentation/awssdk/v1_11/AbstractSqsTracingTest.groovy @@ -10,6 +10,7 @@ import com.amazonaws.auth.BasicAWSCredentials import com.amazonaws.client.builder.AwsClientBuilder import com.amazonaws.services.sqs.AmazonSQSAsyncClient import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder +import com.amazonaws.services.sqs.model.MessageAttributeValue import com.amazonaws.services.sqs.model.ReceiveMessageRequest import com.amazonaws.services.sqs.model.SendMessageRequest import io.opentelemetry.instrumentation.test.InstrumentationSpecification @@ -51,14 +52,21 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { } } - def "simple sqs producer-consumer services"() { + def "simple sqs producer-consumer services #testCaptureHeaders"() { setup: client.createQueue("testSdkSqs") when: - SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") - client.sendMessage(send) - def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs") + SendMessageRequest sendMessageRequest = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}") + if (testCaptureHeaders) { + sendMessageRequest.addMessageAttributesEntry("test-message-header", new MessageAttributeValue().withDataType("String").withStringValue("test")) + } + client.sendMessage(sendMessageRequest) + ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs") + if (testCaptureHeaders) { + receiveMessageRequest.withMessageAttributeNames("test-message-header") + } + def receiveMessageResult = client.receiveMessage(receiveMessageRequest) receiveMessageResult.messages.each {message -> runWithSpan("process child") {} } @@ -113,6 +121,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + if (testCaptureHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } publishSpan = span(0) @@ -140,6 +151,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.NET_PROTOCOL_NAME" "http" "$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1" "$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long + if (testCaptureHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } span(1) { @@ -161,6 +175,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { "$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS" "$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs" "$SemanticAttributes.MESSAGING_OPERATION" "process" + if (testCaptureHeaders) { + "messaging.header.test_message_header" { it == ["test"] } + } } } span(2) { @@ -171,6 +188,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification { } } } + + where: + testCaptureHeaders << [false, true] } def "simple sqs producer-consumer services with parent span"() {