aws-sdk: Use compile time reference+@NoMuzzle instead of reflection. (#8603)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
Christian Neumüller 2023-06-16 22:11:04 +02:00 committed by GitHub
parent 2b58df1627
commit 5b4e05a31e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 355 additions and 475 deletions

View File

@ -10,6 +10,22 @@ muzzle {
// Used by all SDK services, the only case it isn't is an SDK extension such as a custom HTTP
// client, which is not target of instrumentation anyways.
extraDependency("software.amazon.awssdk:protocol-core")
excludeInstrumentationName("aws-sdk-2.2-sqs")
// several software.amazon.awssdk artifacts are missing for this version
skip("2.17.200")
}
}
muzzle {
pass {
group.set("software.amazon.awssdk")
module.set("sqs")
versions.set("[2.2.0,)")
// Used by all SDK services, the only case it isn't is an SDK extension such as a custom HTTP
// client, which is not target of instrumentation anyways.
extraDependency("software.amazon.awssdk:protocol-core")
// several software.amazon.awssdk artifacts are missing for this version
skip("2.17.200")
}
@ -17,8 +33,10 @@ muzzle {
dependencies {
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library-autoconfigure"))
implementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:library"))
library("software.amazon.awssdk:aws-core:2.2.0")
library("software.amazon.awssdk:sqs:2.2.0")
testImplementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))
// Make sure these don't add HTTP headers
@ -37,8 +55,11 @@ dependencies {
tasks.withType<Test>().configureEach {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.aws-sdk.experimental-span-attributes=true")
// TODO run tests both with and without experimental span attributes, with & without extra propagation
systemProperties(mapOf(
"otel.instrumentation.aws-sdk.experimental-span-attributes" to "true",
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging" to "true",
))
}
tasks.withType<com.github.jengelman.gradle.plugins.shadow.tasks.ShadowJar>().configureEach {

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
abstract class AbstractAwsSdkInstrumentationModule extends InstrumentationModule {
protected AbstractAwsSdkInstrumentationModule(String additionalInstrumentationName) {
super("aws-sdk", "aws-sdk-2.2", additionalInstrumentationName);
}
@Override
public boolean isHelperClass(String className) {
return className.startsWith("io.opentelemetry.contrib.awsxray.");
}
}

View File

@ -20,14 +20,9 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(InstrumentationModule.class)
public class AwsSdkInstrumentationModule extends InstrumentationModule {
public class AwsSdkInstrumentationModule extends AbstractAwsSdkInstrumentationModule {
public AwsSdkInstrumentationModule() {
super("aws-sdk", "aws-sdk-2.2");
}
@Override
public boolean isHelperClass(String className) {
return className.startsWith("io.opentelemetry.contrib.awsxray.");
super("aws-sdk-2.2-core");
}
/**

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
import static java.util.Collections.singletonList;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.named;
import com.google.auto.service.AutoService;
import io.opentelemetry.instrumentation.awssdk.v2_2.SqsAdviceBridge;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.List;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
@AutoService(InstrumentationModule.class)
public class SqsInstrumentationModule extends AbstractAwsSdkInstrumentationModule {
public SqsInstrumentationModule() {
super("aws-sdk-2.2-sqs");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new DefaultSqsClientTypeInstrumentation());
}
public static class DefaultSqsClientTypeInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("software.amazon.awssdk.services.sqs.DefaultSqsClient");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isConstructor(), SqsInstrumentationModule.class.getName() + "$RegisterAdvice");
}
}
@SuppressWarnings("unused")
public static class RegisterAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit() {
// (indirectly) using SqsImpl class here to make sure it is available from SqsAccess
// (injected into app classloader) and checked by Muzzle
SqsAdviceBridge.init();
}
}
}

View File

@ -26,5 +26,6 @@ tasks {
test {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", true)
systemProperty("otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", true)
}
}

View File

@ -6,7 +6,9 @@ dependencies {
implementation("io.opentelemetry.contrib:opentelemetry-aws-xray-propagator")
library("software.amazon.awssdk:aws-core:2.2.0")
library("software.amazon.awssdk:sqs:2.2.0")
library("software.amazon.awssdk:aws-json-protocol:2.2.0")
compileOnly(project(":muzzle")) // For @NoMuzzle
testImplementation(project(":instrumentation:aws-sdk:aws-sdk-2.2:testing"))

View File

@ -0,0 +1,56 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
// helper class for calling methods that use sqs types in SqsImpl
// if SqsImpl is not present these methods are no op
final class SqsAccess {
private static final boolean enabled = isSqsImplPresent();
private static boolean isSqsImplPresent() {
try {
// for library instrumentation SqsImpl is always available
// for javaagent instrumentation SqsImpl is available only when SqsInstrumentationModule was
// successfully applied (muzzle passed)
// using package name here because library instrumentation classes are relocated when embedded
// in the agent
Class.forName(SqsAccess.class.getName().replace(".SqsAccess", ".SqsImpl"));
return true;
} catch (ClassNotFoundException e) {
return false;
}
}
@NoMuzzle
static SdkRequest injectIntoSqsSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
if (!enabled) {
return rawRequest;
}
return SqsImpl.injectIntoSqsSendMessageRequest(messagingPropagator, rawRequest, otelContext);
}
@NoMuzzle
static void afterReceiveMessageExecution(
TracingExecutionInterceptor config,
Context.AfterExecution context,
ExecutionAttributes executionAttributes) {
if (!enabled) {
return;
}
SqsImpl.afterConsumerResponse(config, executionAttributes, context);
}
private SqsAccess() {}
}

View File

@ -0,0 +1,15 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
public final class SqsAdviceBridge {
private SqsAdviceBridge() {}
public static void init() {
// called from advice
SqsImpl.init(); // Reference the actual, package-private, implementation class for Muzzle
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.HashMap;
import java.util.Map;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.services.sqs.model.Message;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
// this class is only used from SqsAccess from method with @NoMuzzle annotation
final class SqsImpl {
private SqsImpl() {}
public static void init() {
// called from advice
}
static SdkRequest injectIntoSqsSendMessageRequest(
TextMapPropagator messagingPropagator,
SdkRequest rawRequest,
io.opentelemetry.context.Context otelContext) {
SendMessageRequest request = (SendMessageRequest) rawRequest;
Map<String, MessageAttributeValue> messageAttributes =
new HashMap<>(request.messageAttributes());
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
carrier.put(k, MessageAttributeValue.builder().stringValue(v).dataType("String").build());
});
if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
return request;
}
return request.toBuilder().messageAttributes(messageAttributes).build();
}
/** Create and close CONSUMER span for each message consumed. */
static void afterConsumerResponse(
TracingExecutionInterceptor config,
ExecutionAttributes executionAttributes,
Context.AfterExecution context) {
ReceiveMessageResponse response = (ReceiveMessageResponse) context.response();
SdkHttpResponse httpResponse = context.httpResponse();
for (Message message : response.messages()) {
createConsumerSpan(config, message, executionAttributes, httpResponse);
}
}
private static void createConsumerSpan(
TracingExecutionInterceptor config,
Message message,
ExecutionAttributes executionAttributes,
SdkHttpResponse httpResponse) {
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();
TextMapPropagator messagingPropagator = config.getMessagingPropagator();
if (messagingPropagator != null) {
parentContext =
SqsParentContext.ofMessageAttributes(message.messageAttributes(), messagingPropagator);
}
if (config.shouldUseXrayPropagator()
&& parentContext == io.opentelemetry.context.Context.root()) {
parentContext = SqsParentContext.ofSystemAttributes(message.attributesAsStrings());
}
Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter =
config.getConsumerInstrumenter();
if (consumerInstrumenter.shouldStart(parentContext, executionAttributes)) {
io.opentelemetry.context.Context context =
consumerInstrumenter.start(parentContext, executionAttributes);
// TODO: Even if we keep HTTP attributes (see afterMarshalling), does it make sense here
// per-message?
// TODO: Should we really create root spans if we can't extract anything, or should we attach
// to the current context?
consumerInstrumenter.end(context, executionAttributes, httpResponse, null);
}
}
}

View File

@ -1,95 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static java.lang.invoke.MethodType.methodType;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkPojo;
/**
* Reflective access to aws-sdk-java-sqs class Message.
*
* <p>We currently don't have a good pattern of instrumenting a core library with various plugins
* that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*
* @see <a
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/model/Message.html">SDK
* Javadoc</a>
* @see <a
* href="https://github.com/aws/aws-sdk-java-v2/blob/2.2.0/services/sqs/src/main/resources/codegen-resources/service-2.json#L821-L856">Definition
* JSON</a>
*/
final class SqsMessageAccess {
@Nullable private static final MethodHandle GET_ATTRIBUTES;
@Nullable private static final MethodHandle GET_MESSAGE_ATTRIBUTES;
static {
Class<?> messageClass = null;
try {
messageClass = Class.forName("software.amazon.awssdk.services.sqs.model.Message");
} catch (Throwable t) {
// Ignore.
}
if (messageClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle getAttributes = null;
try {
getAttributes =
lookup.findVirtual(messageClass, "attributesAsStrings", methodType(Map.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_ATTRIBUTES = getAttributes;
MethodHandle getMessageAttributes = null;
try {
getMessageAttributes =
lookup.findVirtual(messageClass, "messageAttributes", methodType(Map.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_MESSAGE_ATTRIBUTES = getMessageAttributes;
} else {
GET_ATTRIBUTES = null;
GET_MESSAGE_ATTRIBUTES = null;
}
}
@SuppressWarnings("unchecked")
static Map<String, String> getAttributes(Object message) {
if (GET_ATTRIBUTES == null) {
return Collections.emptyMap();
}
try {
return (Map<String, String>) GET_ATTRIBUTES.invoke(message);
} catch (Throwable t) {
return Collections.emptyMap();
}
}
private SqsMessageAccess() {}
@SuppressWarnings("unchecked")
public static Map<String, SdkPojo> getMessageAttributes(Object message) {
if (GET_MESSAGE_ATTRIBUTES == null) {
return Collections.emptyMap();
}
try {
return (Map<String, SdkPojo>) GET_MESSAGE_ATTRIBUTES.invoke(message);
} catch (Throwable t) {
return Collections.emptyMap();
}
}
}

View File

@ -1,161 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static java.lang.invoke.MethodType.methodType;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.utils.builder.SdkBuilder;
/**
* Reflective access to aws-sdk-java-sqs class Message.
*
* <p>We currently don't have a good pattern of instrumenting a core library with various plugins
* that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*
* @see <a
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/model/MessageAttributeValue.html">SDK
* Javadoc</a>
* @see <a
* href="https://github.com/aws/aws-sdk-java-v2/blob/2.2.0/services/sqs/src/main/resources/codegen-resources/service-2.json#L866-L896">Definition
* JSON</a>
*/
final class SqsMessageAttributeValueAccess {
@Nullable private static final MethodHandle GET_STRING_VALUE;
@Nullable private static final MethodHandle STRING_VALUE;
@Nullable private static final MethodHandle DATA_TYPE;
@Nullable private static final MethodHandle BUILDER;
static {
Class<?> messageAttributeValueClass = null;
try {
messageAttributeValueClass =
Class.forName("software.amazon.awssdk.services.sqs.model.MessageAttributeValue");
} catch (Throwable t) {
// Ignore.
}
if (messageAttributeValueClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle getStringValue = null;
try {
getStringValue =
lookup.findVirtual(messageAttributeValueClass, "stringValue", methodType(String.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
GET_STRING_VALUE = getStringValue;
} else {
GET_STRING_VALUE = null;
}
Class<?> builderClass = null;
if (messageAttributeValueClass != null) {
try {
builderClass =
Class.forName(
"software.amazon.awssdk.services.sqs.model.MessageAttributeValue$Builder");
} catch (Throwable t) {
// Ignore.
}
}
if (builderClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle stringValue = null;
try {
stringValue =
lookup.findVirtual(builderClass, "stringValue", methodType(builderClass, String.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
STRING_VALUE = stringValue;
MethodHandle dataType = null;
try {
dataType =
lookup.findVirtual(builderClass, "dataType", methodType(builderClass, String.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
DATA_TYPE = dataType;
MethodHandle builder = null;
try {
builder =
lookup.findStatic(messageAttributeValueClass, "builder", methodType(builderClass));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
BUILDER = builder;
} else {
STRING_VALUE = null;
DATA_TYPE = null;
BUILDER = null;
}
}
@SuppressWarnings({"rawtypes"})
static String getStringValue(SdkPojo messageAttributeValue) {
if (GET_STRING_VALUE == null) {
return null;
}
try {
return (String) GET_STRING_VALUE.invoke(messageAttributeValue);
} catch (Throwable t) {
return null;
}
}
/**
* Note that this does not set the (required) dataType automatically, see {@link
* #dataType(SdkBuilder, String)} *
*/
@SuppressWarnings({"rawtypes", "unchecked"})
static SdkBuilder stringValue(SdkBuilder builder, String value) {
if (STRING_VALUE == null) {
return null;
}
try {
return (SdkBuilder) STRING_VALUE.invoke(builder, value);
} catch (Throwable t) {
return null;
}
}
@SuppressWarnings({"rawtypes", "unchecked"})
static SdkBuilder dataType(SdkBuilder builder, String dataType) {
if (DATA_TYPE == null) {
return null;
}
try {
return (SdkBuilder) DATA_TYPE.invoke(builder, dataType);
} catch (Throwable t) {
return null;
}
}
private SqsMessageAttributeValueAccess() {}
@SuppressWarnings({"rawtypes"})
public static SdkBuilder builder() {
if (BUILDER == null) {
return null;
}
try {
return (SdkBuilder) BUILDER.invoke();
} catch (Throwable e) {
return null;
}
}
}

View File

@ -9,9 +9,10 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapGetter;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.util.Collections;
import java.util.Map;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
final class SqsParentContext {
@ -29,31 +30,32 @@ final class SqsParentContext {
}
}
enum MessageAttributeValueMapGetter implements TextMapGetter<Map<String, SdkPojo>> {
enum MessageAttributeValueMapGetter implements TextMapGetter<Map<String, MessageAttributeValue>> {
INSTANCE;
@Override
public Iterable<String> keys(Map<String, SdkPojo> map) {
public Iterable<String> keys(Map<String, MessageAttributeValue> map) {
return map.keySet();
}
@Override
public String get(Map<String, SdkPojo> map, String s) {
@NoMuzzle
public String get(Map<String, MessageAttributeValue> map, String s) {
if (map == null) {
return null;
}
SdkPojo value = map.get(s);
MessageAttributeValue value = map.get(s);
if (value == null) {
return null;
}
return SqsMessageAttributeValueAccess.getStringValue(value);
return value.stringValue();
}
}
static final String AWS_TRACE_SYSTEM_ATTRIBUTE = "AWSTraceHeader";
static Context ofMessageAttributes(
Map<String, SdkPojo> messageAttributes, TextMapPropagator propagator) {
Map<String, MessageAttributeValue> messageAttributes, TextMapPropagator propagator) {
return propagator.extract(
Context.root(), messageAttributes, MessageAttributeValueMapGetter.INSTANCE);
}

View File

@ -5,63 +5,13 @@
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static java.lang.invoke.MethodType.methodType;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.core.SdkRequest;
/**
* Reflective access to aws-sdk-java-sqs class ReceiveMessageRequest.
*
* <p>We currently don't have a good pattern of instrumenting a core library with various plugins
* that need plugin-specific instrumentation - if we accessed the class directly, Muzzle would
* prevent the entire instrumentation from loading when the plugin isn't available. We need to
* carefully check this class has all reflection errors result in no-op, and in the future we will
* hopefully come up with a better pattern.
*
* @see <a
* href="https://sdk.amazonaws.com/java/api/latest/software/amazon/awssdk/services/sqs/model/SendMessageRequest.html">SDK
* Javadoc</a>
* @see <a
* href="https://github.com/aws/aws-sdk-java-v2/blob/2.2.0/services/sqs/src/main/resources/codegen-resources/service-2.json#L1257-L1291">Definition
* JSON</a>
* Reflective access to aws-sdk-java-sqs class ReceiveMessageRequest for points where we are not
* sure whether SQS is on the classpath.
*/
final class SqsSendMessageRequestAccess {
@Nullable private static final MethodHandle MESSAGE_ATTRIBUTES;
static {
Class<?> sendMessageRequestClass = null;
try {
sendMessageRequestClass =
Class.forName("software.amazon.awssdk.services.sqs.model.SendMessageRequest$Builder");
} catch (Throwable t) {
// Ignore.
}
if (sendMessageRequestClass != null) {
MethodHandles.Lookup lookup = MethodHandles.publicLookup();
MethodHandle messageAttributes = null;
try {
messageAttributes =
lookup.findVirtual(
sendMessageRequestClass,
"messageAttributes",
methodType(sendMessageRequestClass, Map.class));
} catch (NoSuchMethodException | IllegalAccessException e) {
// Ignore
}
MESSAGE_ATTRIBUTES = messageAttributes;
} else {
MESSAGE_ATTRIBUTES = null;
}
}
static boolean isInstance(SdkRequest request) {
return request
.getClass()
@ -69,23 +19,5 @@ final class SqsSendMessageRequestAccess {
.equals("software.amazon.awssdk.services.sqs.model.SendMessageRequest");
}
static void messageAttributes(
SdkRequest.Builder builder, Map<String, SdkPojo> messageAttributes) {
if (MESSAGE_ATTRIBUTES == null) {
return;
}
try {
MESSAGE_ATTRIBUTES.invoke(builder, messageAttributes);
} catch (Throwable throwable) {
// Ignore
}
}
@SuppressWarnings({"unchecked", "rawtypes"})
static Map<String, SdkPojo> messageAttributes(SdkRequest request) {
Optional<Map> optional = request.getValueForField("AttributeNames", Map.class);
return optional.isPresent() ? (Map<String, SdkPojo>) optional.get() : Collections.emptyMap();
}
private SqsSendMessageRequestAccess() {}
}

View File

@ -16,15 +16,10 @@ import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import software.amazon.awssdk.awscore.AwsResponse;
import software.amazon.awssdk.core.ClientType;
import software.amazon.awssdk.core.SdkPojo;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
@ -34,7 +29,6 @@ import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
import software.amazon.awssdk.utils.builder.SdkBuilder;
/** AWS request execution interceptor. */
final class TracingExecutionInterceptor implements ExecutionInterceptor {
@ -55,6 +49,20 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
Instrumenter<ExecutionAttributes, SdkHttpResponse> getConsumerInstrumenter() {
return consumerInstrumenter;
}
@Nullable
TextMapPropagator getMessagingPropagator() {
return messagingPropagator;
}
boolean shouldUseXrayPropagator() {
return useXrayPropagator;
}
@Nullable private final TextMapPropagator messagingPropagator;
private final boolean useXrayPropagator;
private final FieldMapper fieldMapper;
@ -118,44 +126,13 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
return modifySqsReceiveMessageRequest(request);
} else if (messagingPropagator != null) {
if (SqsSendMessageRequestAccess.isInstance(request)) {
return injectIntoSqsSendMessageRequest(request, otelContext);
return SqsAccess.injectIntoSqsSendMessageRequest(messagingPropagator, request, otelContext);
}
// TODO: Support SendMessageBatchRequest (and thus SendMessageBatchRequestEntry)
}
return request;
}
private SdkRequest injectIntoSqsSendMessageRequest(
SdkRequest request, io.opentelemetry.context.Context otelContext) {
Map<String, SdkPojo> messageAttributes =
new HashMap<>(SqsSendMessageRequestAccess.messageAttributes(request));
messagingPropagator.inject(
otelContext,
messageAttributes,
(carrier, k, v) -> {
@SuppressWarnings("rawtypes")
SdkBuilder builder = SqsMessageAttributeValueAccess.builder();
if (builder == null) {
return;
}
builder = SqsMessageAttributeValueAccess.stringValue(builder, v);
if (builder == null) {
return;
}
builder = SqsMessageAttributeValueAccess.dataType(builder, "String");
if (builder == null) {
return;
}
carrier.put(k, (SdkPojo) builder.build());
});
if (messageAttributes.size() > 10) { // Too many attributes, we don't want to break the call.
return request;
}
SdkRequest.Builder builder = request.toBuilder();
SqsSendMessageRequestAccess.messageAttributes(builder, messageAttributes);
return builder.build();
}
private SdkRequest modifySqsReceiveMessageRequest(SdkRequest request) {
boolean hasXrayAttribute = true;
List<String> existingAttributeNames = null;
@ -289,7 +266,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
public void afterExecution(
Context.AfterExecution context, ExecutionAttributes executionAttributes) {
if (SqsReceiveMessageRequestAccess.isInstance(context.request())) {
afterConsumerResponse(executionAttributes, context.response(), context.httpResponse());
SqsAccess.afterReceiveMessageExecution(this, context, executionAttributes);
}
io.opentelemetry.context.Context otelContext = getContext(executionAttributes);
@ -310,41 +287,6 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
clearAttributes(executionAttributes);
}
/** Create and close CONSUMER span for each message consumed. */
private void afterConsumerResponse(
ExecutionAttributes executionAttributes, SdkResponse response, SdkHttpResponse httpResponse) {
List<Object> messages = getMessages(response);
for (Object message : messages) {
createConsumerSpan(message, executionAttributes, httpResponse);
}
}
private void createConsumerSpan(
Object message, ExecutionAttributes executionAttributes, SdkHttpResponse httpResponse) {
io.opentelemetry.context.Context parentContext = io.opentelemetry.context.Context.root();
if (messagingPropagator != null) {
parentContext =
SqsParentContext.ofMessageAttributes(
SqsMessageAccess.getMessageAttributes(message), messagingPropagator);
}
if (useXrayPropagator && parentContext == io.opentelemetry.context.Context.root()) {
parentContext = SqsParentContext.ofSystemAttributes(SqsMessageAccess.getAttributes(message));
}
if (consumerInstrumenter.shouldStart(parentContext, executionAttributes)) {
io.opentelemetry.context.Context context =
consumerInstrumenter.start(parentContext, executionAttributes);
// TODO: Even if we keep HTTP attributes (see afterMarshalling), does it make sense here
// per-message?
// TODO: Should we really create root spans if we can't extract anything, or should we attach
// to the current context?
consumerInstrumenter.end(context, executionAttributes, httpResponse, null);
}
}
// Certain headers in the request like User-Agent are only available after execution.
private static void onUserAgentHeaderAvailable(Span span, ExecutionAttributes request) {
List<String> userAgent =
@ -394,10 +336,4 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
static io.opentelemetry.context.Context getContext(ExecutionAttributes attributes) {
return attributes.getAttribute(CONTEXT_ATTRIBUTE);
}
@SuppressWarnings({"rawtypes", "unchecked"})
static List<Object> getMessages(SdkResponse response) {
Optional<List> optional = response.getValueForField("Messages", List.class);
return optional.isPresent() ? optional.get() : Collections.emptyList();
}
}

View File

@ -15,6 +15,7 @@ class Aws2ClientTest extends AbstractAws2ClientTest implements LibraryTestTrait
.addExecutionInterceptor(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setUseConfiguredPropagatorForMessaging(true) // Default on in tests to cover more code
.build()
.newExecutionInterceptor())
}

View File

@ -48,7 +48,6 @@ import software.amazon.awssdk.services.s3.model.GetObjectRequest
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import spock.lang.Shared
import spock.lang.Unroll
@ -385,16 +384,6 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"Sqs" | "SendMessage" | "POST" | "" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl("someurl").messageBody("").build()) } | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>
<MD5OfMessageAttributes>3ae8f24a165a8cedc005670c81a27295</MD5OfMessageAttributes>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
</SendMessageResult>
<ResponseMetadata><RequestId>27daac76-34dd-47df-bd01-1f6e873584a0</RequestId></ResponseMetadata>
</SendMessageResponse>
"""
"Ec2" | "AllocateAddress" | "POST" | "" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2Client.builder() | { c -> c.allocateAddress() } | """
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>
@ -476,16 +465,6 @@ abstract class AbstractAws2ClientTest extends InstrumentationSpecification {
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"Sqs" | "SendMessage" | "POST" | "" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsAsyncClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl("someurl").messageBody("").build()) } | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>
<MD5OfMessageAttributes>3ae8f24a165a8cedc005670c81a27295</MD5OfMessageAttributes>
<MessageId>5fea7756-0ea4-451a-a703-a558b933e274</MessageId>
</SendMessageResult>
<ResponseMetadata><RequestId>27daac76-34dd-47df-bd01-1f6e873584a0</RequestId></ResponseMetadata>
</SendMessageResponse>
"""
"Ec2" | "AllocateAddress" | "POST" | "" | "59dbff89-35bd-4eac-99ed-be587EXAMPLE" | Ec2AsyncClient.builder() | { c -> c.allocateAddress() } | """
<AllocateAddressResponse xmlns="http://ec2.amazonaws.com/doc/2016-11-15/">
<requestId>59dbff89-35bd-4eac-99ed-be587EXAMPLE</requestId>

View File

@ -11,9 +11,10 @@ import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.core.client.builder.SdkClientBuilder
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
@ -31,30 +32,39 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
@Shared
def sqs
@Shared
SqsClient client
@Shared
int sqsPort
void configureSdkClient(SdkClientBuilder builder) {
builder.overrideConfiguration(createOverrideConfigurationBuilder().build())
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.build()
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName("testSdkSqs")
.build()
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.messageBody("{\"type\": \"hello\"}")
.build()
void configureSdkClient(SqsBaseClientBuilder builder) {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(new URI("http://localhost:" + sqsPort))
builder
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
}
abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder()
def setupSpec() {
sqsPort = PortUtils.findOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"
def builder = SqsClient.builder()
configureSdkClient(builder)
client = builder
.endpointOverride(new URI("http://localhost:" + sqsPort))
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
.build()
}
def cleanupSpec() {
@ -63,25 +73,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
}
}
def "simple sqs producer-consumer services"() {
setup:
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName("testSdkSqs")
.build()
client.createQueue(createQueueRequest)
when:
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.messageBody("{\"type\": \"hello\"}")
.build()
client.sendMessage(sendMessageRequest)
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl("http://localhost:$sqsPort/000000000000/testSdkSqs")
.build()
client.receiveMessage(receiveMessageRequest)
then:
void assertSqsTraces() {
assertTraces(3) {
trace(0, 1) {
@ -177,4 +169,38 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
}
}
}
def "simple sqs producer-consumer services: sync"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def client = builder.build()
client.createQueue(createQueueRequest)
when:
client.sendMessage(sendMessageRequest)
client.receiveMessage(receiveMessageRequest)
then:
assertSqsTraces()
}
def "simple sqs producer-consumer services: async"() {
setup:
def builder = SqsAsyncClient.builder()
configureSdkClient(builder)
def client = builder.build()
client.createQueue(createQueueRequest).get()
when:
client.sendMessage(sendMessageRequest).get()
client.receiveMessage(receiveMessageRequest).get()
then:
assertSqsTraces()
}
}