Add messaging conventions to sqs spans (#9712)

This commit is contained in:
Lauri Tulmin 2023-10-23 17:56:58 +03:00 committed by GitHub
parent e9f07d3195
commit 6719ba358e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
25 changed files with 514 additions and 161 deletions

View File

@ -192,7 +192,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "SQS.ReceiveMessage"
name "s3ToSqsTestQueue receive"
kind CONSUMER
childOf span(0)
attributes {
@ -210,6 +210,9 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
@ -556,7 +559,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
trace(9, 1) {
span(0) {
name "SQS.ReceiveMessage"
name "s3ToSnsToSqsTestQueue receive"
kind CONSUMER
hasNoParent()
attributes {
@ -574,6 +577,9 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}

View File

@ -176,7 +176,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "SQS.ReceiveMessage"
name "snsToSqsTestQueue receive"
kind CONSUMER
childOf span(0)
attributes {
@ -194,6 +194,9 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}

View File

@ -5,13 +5,20 @@
package io.opentelemetry.instrumentation.awssdk.v1_11;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.Arrays;
import java.util.List;
@ -25,7 +32,6 @@ final class AwsSdkInstrumenterFactory {
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();
private static final AwsSdkSpanKindExtractor spanKindExtractor = new AwsSdkSpanKindExtractor();
private static final List<AttributesExtractor<Request<?>, Response<?>>>
defaultAttributesExtractors = Arrays.asList(httpAttributesExtractor, rpcAttributesExtractor);
@ -41,27 +47,55 @@ final class AwsSdkInstrumenterFactory {
return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
AwsSdkInstrumenterFactory.spanKindExtractor);
spanName,
SpanKindExtractor.alwaysClient(),
emptyList());
}
static Instrumenter<Request<?>, Response<?>> consumerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry, MessageOperation.RECEIVE, captureExperimentalSpanAttributes);
}
static Instrumenter<Request<?>, Response<?>> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes);
}
private static Instrumenter<Request<?>, Response<?>> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
boolean captureExperimentalSpanAttributes) {
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();
return createInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, SpanKindExtractor.alwaysConsumer());
openTelemetry,
captureExperimentalSpanAttributes,
MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer(),
singletonList(messagingAttributeExtractor));
}
private static Instrumenter<Request<?>, Response<?>> createInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
SpanKindExtractor<Request<?>> kindExtractor) {
SpanNameExtractor<Request<?>> spanNameExtractor,
SpanKindExtractor<Request<?>> spanKindExtractor,
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors) {
return Instrumenter.<Request<?>, Response<?>>builder(
openTelemetry, INSTRUMENTATION_NAME, spanName)
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors)
.buildInstrumenter(kindExtractor);
.addAttributesExtractors(additionalAttributeExtractors)
.buildInstrumenter(spanKindExtractor);
}
private AwsSdkInstrumenterFactory() {}

View File

@ -1,26 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
class AwsSdkSpanKindExtractor implements SpanKindExtractor<Request<?>> {
@Override
public SpanKind extract(Request<?> request) {
AmazonWebServiceRequest originalRequest = request.getOriginalRequest();
return (isSqsProducer(originalRequest) ? SpanKind.PRODUCER : SpanKind.CLIENT);
}
private static boolean isSqsProducer(AmazonWebServiceRequest request) {
return request
.getClass()
.getName()
.equals("com.amazonaws.services.sqs.model.SendMessageRequest");
}
}

View File

@ -46,6 +46,7 @@ public class AwsSdkTelemetry {
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
requestInstrumenter =
@ -54,6 +55,9 @@ public class AwsSdkTelemetry {
consumerInstrumenter =
AwsSdkInstrumenterFactory.consumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
producerInstrumenter =
AwsSdkInstrumenterFactory.producerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
}
/**
@ -61,6 +65,7 @@ public class AwsSdkTelemetry {
* withRequestHandlers}.
*/
public RequestHandler2 newRequestHandler() {
return new TracingRequestHandler(requestInstrumenter, consumerInstrumenter);
return new TracingRequestHandler(
requestInstrumenter, consumerInstrumenter, producerInstrumenter);
}
}

View File

@ -10,6 +10,8 @@ import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.util.Collections;
import java.util.Map;
final class SqsAccess {
private SqsAccess() {}
@ -28,4 +30,9 @@ final class SqsAccess {
static boolean beforeMarshalling(AmazonWebServiceRequest request) {
return enabled && SqsImpl.beforeMarshalling(request);
}
@NoMuzzle
static Map<String, String> getMessageAttributes(Request<?> request) {
return enabled ? SqsImpl.getMessageAttributes(request) : Collections.emptyMap();
}
}

View File

@ -0,0 +1,65 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
enum SqsAttributesGetter implements MessagingAttributesGetter<Request<?>, Response<?>> {
INSTANCE;
@Override
public String getSystem(Request<?> request) {
return "AmazonSQS";
}
@Override
public String getDestination(Request<?> request) {
Object originalRequest = request.getOriginalRequest();
String queueUrl = RequestAccess.getQueueUrl(originalRequest);
int i = queueUrl.lastIndexOf('/');
return i > 0 ? queueUrl.substring(i + 1) : null;
}
@Override
public boolean isTemporaryDestination(Request<?> request) {
return false;
}
@Override
@Nullable
public String getConversationId(Request<?> request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadSize(Request<?> request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadCompressedSize(Request<?> request) {
return null;
}
@Override
@Nullable
public String getMessageId(Request<?> request, @Nullable Response<?> response) {
return null;
}
@Override
public List<String> getMessageHeader(Request<?> request, String name) {
String value = SqsAccess.getMessageAttributes(request).get(name);
return value != null ? Collections.singletonList(value) : Collections.emptyList();
}
}

View File

@ -10,10 +10,15 @@ import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
final class SqsImpl {
static {
@ -67,4 +72,19 @@ final class SqsImpl {
}
return false;
}
static Map<String, String> getMessageAttributes(Request<?> request) {
if (request instanceof SendMessageRequest) {
Map<String, MessageAttributeValue> map =
((SendMessageRequest) request).getMessageAttributes();
if (!map.isEmpty()) {
Map<String, String> result = new HashMap<>();
for (Map.Entry<String, MessageAttributeValue> entry : map.entrySet()) {
result.put(entry.getKey(), entry.getValue().getStringValue());
}
return result;
}
}
return Collections.emptyMap();
}
}

View File

@ -31,12 +31,15 @@ final class TracingRequestHandler extends RequestHandler2 {
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
TracingRequestHandler(
Instrumenter<Request<?>, Response<?>> requestInstrumenter,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter) {
Instrumenter<Request<?>, Response<?>> consumerInstrumenter,
Instrumenter<Request<?>, Response<?>> producerInstrumenter) {
this.requestInstrumenter = requestInstrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.producerInstrumenter = producerInstrumenter;
}
@Override
@ -50,8 +53,10 @@ final class TracingRequestHandler extends RequestHandler2 {
return;
}
Instrumenter<Request<?>, Response<?>> instrumenter = getInstrumenter(request);
Context parentContext = Context.current();
if (!requestInstrumenter.shouldStart(parentContext, request)) {
if (!instrumenter.shouldStart(parentContext, request)) {
return;
}
@ -62,14 +67,14 @@ final class TracingRequestHandler extends RequestHandler2 {
if (Context.root() == parentContext
&& "com.amazonaws.services.sqs.model.ReceiveMessageRequest"
.equals(request.getOriginalRequest().getClass().getName())) {
Context context = InstrumenterUtil.suppressSpan(requestInstrumenter, parentContext, request);
Context context = InstrumenterUtil.suppressSpan(instrumenter, parentContext, request);
context = context.with(REQUEST_START_KEY, Instant.now());
context = context.with(PARENT_CONTEXT_KEY, parentContext);
request.addHandlerContext(CONTEXT, context);
return;
}
Context context = requestInstrumenter.start(parentContext, request);
Context context = instrumenter.start(parentContext, request);
AwsXrayPropagator.getInstance().inject(context, request, HeaderSetter.INSTANCE);
@ -104,24 +109,28 @@ final class TracingRequestHandler extends RequestHandler2 {
}
request.addHandlerContext(CONTEXT, null);
Instrumenter<Request<?>, Response<?>> instrumenter = getInstrumenter(request);
// see beforeRequest, requestStart is only set when we skip creating request span for sqs
// AmazonSQSClient.receiveMessage calls
Instant requestStart = context.get(REQUEST_START_KEY);
if (requestStart != null) {
Context parentContext = context.get(PARENT_CONTEXT_KEY);
// create request span if there was an error
if (error != null) {
if (error != null && requestInstrumenter.shouldStart(parentContext, request)) {
InstrumenterUtil.startAndEnd(
requestInstrumenter,
context.get(PARENT_CONTEXT_KEY),
request,
response,
error,
requestStart,
Instant.now());
instrumenter, parentContext, request, response, error, requestStart, Instant.now());
}
return;
}
requestInstrumenter.end(context, request, response, error);
instrumenter.end(context, request, response, error);
}
private Instrumenter<Request<?>, Response<?>> getInstrumenter(Request<?> request) {
boolean isSqsProducer =
"com.amazonaws.services.sqs.model.SendMessageRequest"
.equals(request.getOriginalRequest().getClass().getName());
return isSqsProducer ? producerInstrumenter : requestInstrumenter;
}
}

View File

@ -87,7 +87,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
}
trace(1, 2) {
span(0) {
name "SQS.SendMessage"
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
@ -102,13 +102,15 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
name "SQS.ReceiveMessage"
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
attributes {
@ -124,6 +126,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
@ -172,7 +177,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
}
trace(1, 2) {
span(0) {
name "SQS.SendMessage"
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
@ -187,13 +192,15 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
name "SQS.ReceiveMessage"
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
attributes {
@ -209,6 +216,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long

View File

@ -11,10 +11,9 @@ import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpResponse;
class AwsSdkExperimentalAttributesExtractor
implements AttributesExtractor<ExecutionAttributes, SdkHttpResponse> {
implements AttributesExtractor<ExecutionAttributes, Response> {
private static final String COMPONENT_NAME = "java-aws-sdk";
private static final AttributeKey<String> AWS_AGENT = AttributeKey.stringKey("aws.agent");
@ -32,6 +31,6 @@ class AwsSdkExperimentalAttributesExtractor
AttributesBuilder attributes,
Context context,
ExecutionAttributes executionAttributes,
@Nullable SdkHttpResponse sdkHttpResponse,
@Nullable Response response,
@Nullable Throwable error) {}
}

View File

@ -12,10 +12,9 @@ import java.util.List;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.SdkHttpResponse;
class AwsSdkHttpAttributesGetter
implements HttpClientAttributesGetter<ExecutionAttributes, SdkHttpResponse> {
implements HttpClientAttributesGetter<ExecutionAttributes, Response> {
@Override
public String getUrlFull(ExecutionAttributes request) {
@ -41,14 +40,14 @@ class AwsSdkHttpAttributesGetter
@Override
public Integer getHttpResponseStatusCode(
ExecutionAttributes request, SdkHttpResponse response, @Nullable Throwable error) {
return response.statusCode();
ExecutionAttributes request, Response response, @Nullable Throwable error) {
return response.getSdkHttpResponse().statusCode();
}
@Override
public List<String> getHttpResponseHeader(
ExecutionAttributes request, SdkHttpResponse response, String name) {
List<String> value = response.headers().get(name);
ExecutionAttributes request, Response response, String name) {
List<String> value = response.getSdkHttpResponse().headers().get(name);
return value == null ? emptyList() : value;
}

View File

@ -13,7 +13,6 @@ import io.opentelemetry.instrumentation.api.internal.SpanKey;
import io.opentelemetry.instrumentation.api.internal.SpanKeyProvider;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.http.SdkHttpResponse;
/**
* An attribute extractor that reports implementing HTTP client semantic conventions. Adding this
@ -21,7 +20,7 @@ import software.amazon.awssdk.http.SdkHttpResponse;
* HttpClientAttributesExtractor} would.
*/
class AwsSdkHttpClientSuppressionAttributesExtractor
implements AttributesExtractor<ExecutionAttributes, SdkHttpResponse>, SpanKeyProvider {
implements AttributesExtractor<ExecutionAttributes, Response>, SpanKeyProvider {
@Override
public void onStart(
@ -34,7 +33,7 @@ class AwsSdkHttpClientSuppressionAttributesExtractor
AttributesBuilder attributes,
Context context,
ExecutionAttributes executionAttributes,
@Nullable SdkHttpResponse sdkHttpResponse,
@Nullable Response response,
@Nullable Throwable error) {}
@Nullable

View File

@ -9,53 +9,55 @@ import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute;
import software.amazon.awssdk.http.SdkHttpResponse;
final class AwsSdkInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-2.2";
static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> rpcAttributesExtractor =
private static final AttributesExtractor<ExecutionAttributes, Response> rpcAttributesExtractor =
RpcClientAttributesExtractor.create(AwsSdkRpcAttributesGetter.INSTANCE);
private static final AwsSdkExperimentalAttributesExtractor experimentalAttributesExtractor =
new AwsSdkExperimentalAttributesExtractor();
static final AwsSdkHttpAttributesGetter httpAttributesGetter = new AwsSdkHttpAttributesGetter();
static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse> httpAttributesExtractor =
static final AttributesExtractor<ExecutionAttributes, Response> httpAttributesExtractor =
HttpClientAttributesExtractor.create(httpAttributesGetter);
private static final AttributesExtractor<ExecutionAttributes, SdkHttpResponse>
private static final AttributesExtractor<ExecutionAttributes, Response>
httpClientSuppressionAttributesExtractor =
new AwsSdkHttpClientSuppressionAttributesExtractor();
private static final AwsSdkSpanKindExtractor spanKindExtractor = new AwsSdkSpanKindExtractor();
private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
private static final List<AttributesExtractor<ExecutionAttributes, Response>>
defaultAttributesExtractors =
Arrays.asList(rpcAttributesExtractor, httpClientSuppressionAttributesExtractor);
private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
private static final List<AttributesExtractor<ExecutionAttributes, Response>>
extendedAttributesExtractors =
Arrays.asList(
rpcAttributesExtractor,
experimentalAttributesExtractor,
httpClientSuppressionAttributesExtractor);
private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
private static final List<AttributesExtractor<ExecutionAttributes, Response>>
defaultConsumerAttributesExtractors =
Arrays.asList(rpcAttributesExtractor, httpAttributesExtractor);
private static final List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>>
private static final List<AttributesExtractor<ExecutionAttributes, Response>>
extendedConsumerAttributesExtractors =
Arrays.asList(
rpcAttributesExtractor, httpAttributesExtractor, experimentalAttributesExtractor);
static Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter(
static Instrumenter<ExecutionAttributes, Response> requestInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return createInstrumenter(
@ -63,27 +65,60 @@ final class AwsSdkInstrumenterFactory {
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors,
AwsSdkInstrumenterFactory.spanKindExtractor);
AwsSdkInstrumenterFactory::spanName,
SpanKindExtractor.alwaysClient());
}
static Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter(
static Instrumenter<ExecutionAttributes, Response> consumerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry,
MessageOperation.RECEIVE,
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors);
}
static Instrumenter<ExecutionAttributes, Response> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry,
MessageOperation.PUBLISH,
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors);
}
private static Instrumenter<ExecutionAttributes, Response> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
List<AttributesExtractor<ExecutionAttributes, Response>> extractors) {
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<ExecutionAttributes, Response> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();
List<AttributesExtractor<ExecutionAttributes, Response>> newExtractors =
new ArrayList<>(extractors);
newExtractors.add(messagingAttributeExtractor);
return createInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes
? extendedConsumerAttributesExtractors
: defaultConsumerAttributesExtractors,
SpanKindExtractor.alwaysConsumer());
newExtractors,
MessagingSpanNameExtractor.create(getter, operation),
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer());
}
private static Instrumenter<ExecutionAttributes, SdkHttpResponse> createInstrumenter(
private static Instrumenter<ExecutionAttributes, Response> createInstrumenter(
OpenTelemetry openTelemetry,
List<AttributesExtractor<ExecutionAttributes, SdkHttpResponse>> extractors,
List<AttributesExtractor<ExecutionAttributes, Response>> extractors,
SpanNameExtractor<ExecutionAttributes> spanNameExtractor,
SpanKindExtractor<ExecutionAttributes> spanKindExtractor) {
return Instrumenter.<ExecutionAttributes, SdkHttpResponse>builder(
openTelemetry, INSTRUMENTATION_NAME, AwsSdkInstrumenterFactory::spanName)
return Instrumenter.<ExecutionAttributes, Response>builder(
openTelemetry,
INSTRUMENTATION_NAME,
spanNameExtractor) // AwsSdkInstrumenterFactory::spanName
.addAttributesExtractors(extractors)
.buildInstrumenter(spanKindExtractor);
}

View File

@ -1,27 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
class AwsSdkSpanKindExtractor implements SpanKindExtractor<ExecutionAttributes> {
@Override
public SpanKind extract(ExecutionAttributes request) {
return isSqsProducer(request) ? SpanKind.PRODUCER : SpanKind.CLIENT;
}
private static boolean isSqsProducer(ExecutionAttributes executionAttributes) {
SdkRequest request =
executionAttributes.getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE);
return request
.getClass()
.getName()
.equals("software.amazon.awssdk.services.sqs.model.SendMessageRequest");
}
}

View File

@ -12,7 +12,6 @@ import javax.annotation.Nullable;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
import software.amazon.awssdk.core.interceptor.ExecutionInterceptor;
import software.amazon.awssdk.http.SdkHttpResponse;
/**
* Entrypoint to OpenTelemetry instrumentation of the AWS SDK. Register the {@link
@ -41,8 +40,9 @@ public class AwsSdkTelemetry {
return new AwsSdkTelemetryBuilder(openTelemetry);
}
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> consumerInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
@Nullable private final TextMapPropagator messagingPropagator;
private final boolean useXrayPropagator;
@ -61,6 +61,9 @@ public class AwsSdkTelemetry {
this.consumerInstrumenter =
AwsSdkInstrumenterFactory.consumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
this.producerInstrumenter =
AwsSdkInstrumenterFactory.producerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingPropagator =
useMessagingPropagator ? openTelemetry.getPropagators().getTextMapPropagator() : null;
@ -75,6 +78,7 @@ public class AwsSdkTelemetry {
return new TracingExecutionInterceptor(
requestInstrumenter,
consumerInstrumenter,
producerInstrumenter,
captureExperimentalSpanAttributes,
messagingPropagator,
useXrayPropagator,

View File

@ -0,0 +1,31 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.http.SdkHttpResponse;
final class Response {
private final SdkHttpResponse sdkHttpResponse;
private final SdkResponse sdkResponse;
Response(SdkHttpResponse sdkHttpResponse) {
this(sdkHttpResponse, null);
}
Response(SdkHttpResponse sdkHttpResponse, SdkResponse sdkResponse) {
this.sdkHttpResponse = sdkHttpResponse;
this.sdkResponse = sdkResponse;
}
public SdkHttpResponse getSdkHttpResponse() {
return sdkHttpResponse;
}
public SdkResponse getSdkResponse() {
return sdkResponse;
}
}

View File

@ -9,6 +9,7 @@ import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.Context;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
@ -42,4 +43,19 @@ final class SqsAccess {
? SqsImpl.modifyRequest(request, otelContext, useXrayPropagator, messagingPropagator)
: null;
}
@NoMuzzle
static boolean isSqsProducerRequest(SdkRequest request) {
return enabled && SqsImpl.isSqsProducerRequest(request);
}
@NoMuzzle
static String getQueueUrl(SdkRequest request) {
return enabled ? SqsImpl.getQueueUrl(request) : null;
}
@NoMuzzle
static String getMessageId(SdkResponse response) {
return enabled ? SqsImpl.getMessageId(response) : null;
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import software.amazon.awssdk.core.SdkRequest;
import software.amazon.awssdk.core.SdkResponse;
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
enum SqsAttributesGetter implements MessagingAttributesGetter<ExecutionAttributes, Response> {
INSTANCE;
@Override
public String getSystem(ExecutionAttributes request) {
return "AmazonSQS";
}
@Override
public String getDestination(ExecutionAttributes request) {
SdkRequest sdkRequest = request.getAttribute(TracingExecutionInterceptor.SDK_REQUEST_ATTRIBUTE);
String queueUrl = SqsAccess.getQueueUrl(sdkRequest);
if (queueUrl != null) {
int i = queueUrl.lastIndexOf('/');
if (i > 0) {
return queueUrl.substring(i + 1);
}
}
return null;
}
@Override
public boolean isTemporaryDestination(ExecutionAttributes request) {
return false;
}
@Override
@Nullable
public String getConversationId(ExecutionAttributes request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadSize(ExecutionAttributes request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadCompressedSize(ExecutionAttributes request) {
return null;
}
@Override
@Nullable
public String getMessageId(ExecutionAttributes request, @Nullable Response response) {
if (response != null && response.getSdkResponse() != null) {
SdkResponse sdkResponse = response.getSdkResponse();
return SqsAccess.getMessageId(sdkResponse);
}
return null;
}
@Override
public List<String> getMessageHeader(ExecutionAttributes request, String name) {
// TODO: not implemented
return Collections.emptyList();
}
}

View File

@ -25,6 +25,7 @@ import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequestEntry;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
// this class is only used from SqsAccess from method with @NoMuzzle annotation
final class SqsImpl {
@ -75,7 +76,7 @@ final class SqsImpl {
parentContext = SqsParentContext.ofSystemAttributes(message.attributesAsStrings());
}
Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter =
Instrumenter<ExecutionAttributes, Response> consumerInstrumenter =
config.getConsumerInstrumenter();
if (consumerInstrumenter.shouldStart(parentContext, executionAttributes)) {
io.opentelemetry.context.Context context =
@ -85,7 +86,7 @@ final class SqsImpl {
// per-message?
// TODO: Should we really create root spans if we can't extract anything, or should we attach
// to the current context?
consumerInstrumenter.end(context, executionAttributes, httpResponse, null);
consumerInstrumenter.end(context, executionAttributes, new Response(httpResponse), null);
}
}
@ -200,4 +201,26 @@ final class SqsImpl {
}
return builder.build();
}
static boolean isSqsProducerRequest(SdkRequest request) {
return request instanceof SendMessageRequest || request instanceof SendMessageBatchRequest;
}
static String getQueueUrl(SdkRequest request) {
if (request instanceof SendMessageRequest) {
return ((SendMessageRequest) request).queueUrl();
} else if (request instanceof SendMessageBatchRequest) {
return ((SendMessageBatchRequest) request).queueUrl();
} else if (request instanceof ReceiveMessageRequest) {
return ((ReceiveMessageRequest) request).queueUrl();
}
return null;
}
static String getMessageId(SdkResponse response) {
if (response instanceof SendMessageResponse) {
return ((SendMessageResponse) response).messageId();
}
return null;
}
}

View File

@ -58,15 +58,16 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
private static final ExecutionAttribute<RequestSpanFinisher> REQUEST_FINISHER_ATTRIBUTE =
new ExecutionAttribute<>(TracingExecutionInterceptor.class.getName() + ".RequestFinisher");
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> requestInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> consumerInstrumenter;
private final Instrumenter<ExecutionAttributes, Response> producerInstrumenter;
private final boolean captureExperimentalSpanAttributes;
static final AttributeKey<String> HTTP_ERROR_MSG =
AttributeKey.stringKey("aws.http.error_message");
static final String HTTP_FAILURE_EVENT = "HTTP request failure";
Instrumenter<ExecutionAttributes, SdkHttpResponse> getConsumerInstrumenter() {
Instrumenter<ExecutionAttributes, Response> getConsumerInstrumenter() {
return consumerInstrumenter;
}
@ -85,14 +86,16 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
private final FieldMapper fieldMapper;
TracingExecutionInterceptor(
Instrumenter<ExecutionAttributes, SdkHttpResponse> requestInstrumenter,
Instrumenter<ExecutionAttributes, SdkHttpResponse> consumerInstrumenter,
Instrumenter<ExecutionAttributes, Response> requestInstrumenter,
Instrumenter<ExecutionAttributes, Response> consumerInstrumenter,
Instrumenter<ExecutionAttributes, Response> producerInstrumenter,
boolean captureExperimentalSpanAttributes,
TextMapPropagator messagingPropagator,
boolean useXrayPropagator,
boolean recordIndividualHttpError) {
this.requestInstrumenter = requestInstrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.producerInstrumenter = producerInstrumenter;
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.messagingPropagator = messagingPropagator;
this.useXrayPropagator = useXrayPropagator;
@ -118,8 +121,9 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
}
executionAttributes.putAttribute(SDK_REQUEST_ATTRIBUTE, request);
Instrumenter<ExecutionAttributes, Response> instrumenter = getInstrumenter(request);
if (!requestInstrumenter.shouldStart(parentOtelContext, executionAttributes)) {
if (!instrumenter.shouldStart(parentOtelContext, executionAttributes)) {
// NB: We also skip injection in case we don't start.
return request;
}
@ -135,16 +139,16 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
&& "software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest"
.equals(request.getClass().getName())) {
otelContext =
InstrumenterUtil.suppressSpan(
requestInstrumenter, parentOtelContext, executionAttributes);
InstrumenterUtil.suppressSpan(instrumenter, parentOtelContext, executionAttributes);
requestFinisher =
(otelContext12, executionAttributes12, response, exception) -> {
(finisherOtelContext, finisherExecutionAttributes, response, exception) -> {
// generate request span when there was an error
if (exception != null) {
if (exception != null
&& instrumenter.shouldStart(finisherOtelContext, finisherExecutionAttributes)) {
InstrumenterUtil.startAndEnd(
requestInstrumenter,
parentOtelContext,
executionAttributes12,
instrumenter,
finisherOtelContext,
finisherExecutionAttributes,
response,
exception,
requestStart,
@ -152,8 +156,8 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
}
};
} else {
otelContext = requestInstrumenter.start(parentOtelContext, executionAttributes);
requestFinisher = requestInstrumenter::end;
otelContext = instrumenter.start(parentOtelContext, executionAttributes);
requestFinisher = instrumenter::end;
}
executionAttributes.putAttribute(CONTEXT_ATTRIBUTE, otelContext);
@ -237,7 +241,7 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
// For the httpAttributesExtractor dance, see afterMarshalling
AttributesBuilder builder = Attributes.builder(); // NB: UnsafeAttributes are package-private
AwsSdkInstrumenterFactory.httpAttributesExtractor.onEnd(
builder, otelContext, executionAttributes, httpResponse, null);
builder, otelContext, executionAttributes, new Response(httpResponse), null);
span.setAllAttributes(builder.build());
}
@ -326,7 +330,8 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
onHttpResponseAvailable(
executionAttributes, otelContext, Span.fromContext(otelContext), httpResponse);
RequestSpanFinisher finisher = executionAttributes.getAttribute(REQUEST_FINISHER_ATTRIBUTE);
finisher.finish(otelContext, executionAttributes, httpResponse, null);
finisher.finish(
otelContext, executionAttributes, new Response(httpResponse, context.response()), null);
}
clearAttributes(executionAttributes);
}
@ -414,11 +419,15 @@ final class TracingExecutionInterceptor implements ExecutionInterceptor {
return attributes.getAttribute(CONTEXT_ATTRIBUTE);
}
private Instrumenter<ExecutionAttributes, Response> getInstrumenter(SdkRequest request) {
return SqsAccess.isSqsProducerRequest(request) ? producerInstrumenter : requestInstrumenter;
}
private interface RequestSpanFinisher {
void finish(
io.opentelemetry.context.Context otelContext,
ExecutionAttributes executionAttributes,
SdkHttpResponse response,
Response response,
Throwable exception);
}
}

View File

@ -44,6 +44,8 @@ import static io.opentelemetry.api.trace.StatusCode.ERROR
@Unroll
abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
static final String QUEUE_URL = "http://xxx/somequeue"
void assumeSupportedConfig(service, operation) {
Assumptions.assumeFalse(
service == "Sqs"
@ -81,7 +83,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
assertTraces(1) {
trace(0, 1) {
span(0) {
name "$service.$operation"
name operation != "SendMessage" ? "$service.$operation" : "somequeue publish"
kind operation != "SendMessage" ? CLIENT : PRODUCER
hasNoParent()
attributes {
@ -112,7 +114,10 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
} else if (service == "Sqs" && operation == "CreateQueue") {
"aws.queue.name" "somequeue"
} else if (service == "Sqs" && operation == "SendMessage") {
"aws.queue.url" "someurl"
"aws.queue.url" QUEUE_URL
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somequeue"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
} else if (service == "Kinesis") {
"aws.stream.name" "somestream"
}
@ -135,7 +140,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl("someurl").messageBody("").build()) } | """
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody("").build()) } | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>
@ -181,7 +186,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
assertTraces(1) {
trace(0, 1) {
span(0) {
name "$service.$operation"
name operation != "SendMessage" ? "$service.$operation" : "somequeue publish"
kind operation != "SendMessage" ? CLIENT : PRODUCER
hasNoParent()
attributes {
@ -212,7 +217,10 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
} else if (service == "Sqs" && operation == "CreateQueue") {
"aws.queue.name" "somequeue"
} else if (service == "Sqs" && operation == "SendMessage") {
"aws.queue.url" "someurl"
"aws.queue.url" QUEUE_URL
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somequeue"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
} else if (service == "Kinesis") {
"aws.stream.name" "somestream"
}
@ -247,7 +255,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
<ResponseMetadata><RequestId>7a62c49f-347e-4fc4-9331-6e8e7a96aa73</RequestId></ResponseMetadata>
</CreateQueueResponse>
"""
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsAsyncClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl("someurl").messageBody("").build()) } | """
"Sqs" | "SendMessage" | "POST" | "27daac76-34dd-47df-bd01-1f6e873584a0" | SqsAsyncClient.builder() | { c -> c.sendMessage(SendMessageRequest.builder().queueUrl(QUEUE_URL).messageBody("").build()) } | """
<SendMessageResponse>
<SendMessageResult>
<MD5OfMessageBody>d41d8cd98f00b204e9800998ecf8427e</MD5OfMessageBody>

View File

@ -138,7 +138,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
}
trace(1, 2) {
span(0) {
name "Sqs.SendMessage"
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
@ -154,12 +154,15 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
span(1) {
name "Sqs.ReceiveMessage"
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
hasNoLinks() // TODO: Link to receive operation?
@ -174,6 +177,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
@ -302,8 +308,8 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
}
trace(1, xrayInjectionEnabled ? 4 : 3) {
span(0) {
name "Sqs.SendMessageBatch"
kind CLIENT // TODO: Probably this should be producer, but that would be a breaking change
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
@ -318,13 +324,15 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) {
span(i) {
name "Sqs.ReceiveMessage"
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
hasNoLinks() // TODO: Link to receive operation?
@ -340,6 +348,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
@ -349,7 +360,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
if (!xrayInjectionEnabled) {
trace(2, 1) {
span(0) {
name "Sqs.ReceiveMessage"
name "testSdkSqs receive"
kind CONSUMER
// TODO This is not nice at all, and can also happen if producer is not instrumented
@ -367,6 +378,9 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"net.peer.name" "localhost"
"$SemanticAttributes.USER_AGENT_ORIGINAL" String
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}

View File

@ -13,8 +13,12 @@ import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satis
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.semconv.SemanticAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
class AwsSpanAssertions {
@ -37,9 +41,20 @@ class AwsSpanAssertions {
static SpanDataAssert sqs(
SpanDataAssert span, String spanName, String queueUrl, String queueName, SpanKind spanKind) {
return span.hasName(spanName)
.hasKind(spanKind)
.hasAttributesSatisfyingExactly(
String rpcMethod;
if (spanName.startsWith("SQS.")) {
rpcMethod = spanName.substring(4);
} else if (spanName.endsWith("receive")) {
rpcMethod = "ReceiveMessage";
} else if (spanName.endsWith("publish")) {
rpcMethod = "SendMessage";
} else {
throw new IllegalStateException("can't get rpc method from span name " + spanName);
}
List<AttributeAssertion> attributeAssertions = new ArrayList<>();
attributeAssertions.addAll(
Arrays.asList(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
@ -83,10 +98,22 @@ class AwsSpanAssertions {
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"),
equalTo(stringKey("rpc.system"), "aws-api"),
satisfies(
stringKey("rpc.method"),
stringAssert -> stringAssert.isEqualTo(spanName.substring(4))),
equalTo(stringKey("rpc.service"), "AmazonSQS"));
satisfies(stringKey("rpc.method"), stringAssert -> stringAssert.isEqualTo(rpcMethod)),
equalTo(stringKey("rpc.service"), "AmazonSQS")));
if (spanName.endsWith("receive") || spanName.endsWith("publish")) {
attributeAssertions.addAll(
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, queueName),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS")));
if (spanName.endsWith("receive")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
}
}
return span.hasName(spanName)
.hasKind(spanKind)
.hasAttributesSatisfyingExactly(attributeAssertions);
}
@SuppressWarnings("deprecation") // until old http semconv are dropped in 2.0

View File

@ -56,11 +56,11 @@ class SqsCamelTest {
span -> CamelSpanAssertions.sqsProduce(span, queueName).hasParent(trace.getSpan(0)),
span ->
AwsSpanAssertions.sqs(
span, "SQS.SendMessage", queueUrl, null, SpanKind.PRODUCER)
span, "sqsCamelTest publish", queueUrl, queueName, SpanKind.PRODUCER)
.hasParent(trace.getSpan(1)),
span ->
AwsSpanAssertions.sqs(
span, "SQS.ReceiveMessage", queueUrl, null, SpanKind.CONSUMER)
span, "sqsCamelTest receive", queueUrl, queueName, SpanKind.CONSUMER)
.hasParent(trace.getSpan(2)),
span ->
CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(2))),
@ -91,11 +91,11 @@ class SqsCamelTest {
trace.hasSpansSatisfyingExactly(
span ->
AwsSpanAssertions.sqs(
span, "SQS.SendMessage", queueUrl, null, SpanKind.PRODUCER)
span, "sqsCamelTest publish", queueUrl, queueName, SpanKind.PRODUCER)
.hasNoParent(),
span ->
AwsSpanAssertions.sqs(
span, "SQS.ReceiveMessage", queueUrl, null, SpanKind.CONSUMER)
span, "sqsCamelTest receive", queueUrl, queueName, SpanKind.CONSUMER)
.hasParent(trace.getSpan(0)),
span ->
CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(0))),
@ -128,11 +128,19 @@ class SqsCamelTest {
span -> CamelSpanAssertions.sqsProduce(span, queueName).hasParent(trace.getSpan(0)),
span ->
AwsSpanAssertions.sqs(
span, "SQS.SendMessage", queueUrl, null, SpanKind.PRODUCER)
span,
"sqsCamelTestSdkConsumer publish",
queueUrl,
queueName,
SpanKind.PRODUCER)
.hasParent(trace.getSpan(1)),
span ->
AwsSpanAssertions.sqs(
span, "SQS.ReceiveMessage", queueUrl, null, SpanKind.CONSUMER)
span,
"sqsCamelTestSdkConsumer receive",
queueUrl,
queueName,
SpanKind.CONSUMER)
.hasParent(trace.getSpan(2))));
camelApp.stop();
}