Add process spans to aws-1 sqs instrumentation (#9796)

This commit is contained in:
Lauri Tulmin 2023-11-07 18:25:50 +02:00 committed by GitHub
parent 33275b1522
commit 23a6a3e2c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 1099 additions and 123 deletions

View File

@ -96,6 +96,22 @@ testing {
implementation("com.amazonaws:aws-java-sdk-sqs:1.11.106")
}
targets {
all {
testTask.configure {
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}
}
}
}
val testSqsNoReceiveTelemetry by registering(JvmTestSuite::class) {
dependencies {
implementation(project(":instrumentation:aws-sdk:aws-sdk-1.11:testing"))
implementation("com.amazonaws:aws-java-sdk-sqs:1.11.106")
}
}
}
}
@ -105,14 +121,19 @@ tasks {
check {
dependsOn(testing.suites)
}
} else {
check {
dependsOn(testing.suites.named("testSqs"), testing.suites.named("testSqsNoReceiveTelemetry"))
}
}
test {
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}
withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.aws-sdk.experimental-span-attributes=true")
systemProperty("testLatestDeps", findProperty("testLatestDeps") as Boolean)
}
}

View File

@ -14,6 +14,7 @@ import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.awssdk.v1_11.AwsSdkTelemetry;
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
import io.opentelemetry.javaagent.bootstrap.internal.InstrumentationConfig;
/**
@ -37,6 +38,8 @@ public class TracingRequestHandler extends RequestHandler2 {
.setCaptureExperimentalSpanAttributes(
InstrumentationConfig.get()
.getBoolean("otel.instrumentation.aws-sdk.experimental-span-attributes", false))
.setMessagingReceiveInstrumentationEnabled(
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.build()
.newRequestHandler();

View File

@ -36,7 +36,11 @@ class S3TracingTest extends AgentInstrumentationSpecification {
awsConnector.receiveMessage(queueUrl)
awsConnector.putSampleData(bucketName)
// traced message
awsConnector.receiveMessage(queueUrl)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
// cleanup
awsConnector.deleteBucket(bucketName)
awsConnector.purgeQueue(queueUrl)
@ -168,7 +172,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
}
trace(5, 2) {
trace(5, 3) {
span(0) {
name "S3.PutObject"
kind CLIENT
@ -192,7 +196,7 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "s3ToSqsTestQueue receive"
name "s3ToSqsTestQueue process"
kind CONSUMER
childOf span(0)
attributes {
@ -203,17 +207,18 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" String
"net.peer.name" String
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
@ -336,7 +341,10 @@ class S3TracingTest extends AgentInstrumentationSpecification {
awsConnector.receiveMessage(queueUrl)
awsConnector.putSampleData(bucketName)
// traced message
awsConnector.receiveMessage(queueUrl)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
// cleanup
awsConnector.deleteBucket(bucketName)
awsConnector.purgeQueue(queueUrl)
@ -556,9 +564,9 @@ class S3TracingTest extends AgentInstrumentationSpecification {
}
}
}
trace(9, 1) {
trace(9, 2) {
span(0) {
name "s3ToSnsToSqsTestQueue receive"
name "s3ToSnsToSqsTestQueue process"
kind CONSUMER
hasNoParent()
attributes {
@ -569,19 +577,22 @@ class S3TracingTest extends AgentInstrumentationSpecification {
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" String
"net.peer.name" String
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "s3ToSnsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}
}
span(1) {
name "process child"
childOf span(0)
attributes {
}
}
}
trace(10, 1) {
span(0) {

View File

@ -32,7 +32,10 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
when:
awsConnector.publishSampleNotification(topicArn)
awsConnector.receiveMessage(queueUrl)
def receiveMessageResult = awsConnector.receiveMessage(queueUrl)
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
then:
assertTraces(6) {
@ -154,7 +157,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
}
}
}
trace(5, 2) {
trace(5, 3) {
span(0) {
name "SNS.Publish"
kind CLIENT
@ -176,7 +179,7 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
}
}
span(1) {
name "snsToSqsTestQueue receive"
name "snsToSqsTestQueue process"
kind CONSUMER
childOf span(0)
attributes {
@ -187,16 +190,18 @@ class SnsTracingTest extends AgentInstrumentationSpecification {
"rpc.service" "AmazonSQS"
"rpc.method" "ReceiveMessage"
"http.method" "POST"
"http.status_code" 200
"http.url" String
"net.peer.name" String
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"net.peer.port" { it == null || Number }
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "snsToSqsTestQueue"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}

View File

@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsSuppressReceiveSpansTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements AgentTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client
}
}

View File

@ -20,6 +20,26 @@ dependencies {
testLibrary("com.amazonaws:aws-java-sdk-sqs:1.11.106")
}
tasks.test {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
tasks {
withType<Test>().configureEach {
systemProperty("otel.instrumentation.aws-sdk.experimental-span-attributes", "true")
}
val testReceiveSpansDisabled by registering(Test::class) {
filter {
includeTestsMatching("SqsSuppressReceiveSpansTest")
}
include("**/SqsSuppressReceiveSpansTest.*")
}
test {
filter {
excludeTestsMatching("SqsSuppressReceiveSpansTest")
}
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
}
check {
dependsOn(testReceiveSpansDisabled)
}
}

View File

@ -23,6 +23,9 @@ public class TracingRequestHandler extends RequestHandler2 {
.setCaptureExperimentalSpanAttributes(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-span-attributes", false))
.setMessagingReceiveInstrumentationEnabled(
ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.messaging.experimental.receive-telemetry.enabled", false))
.build()
.newRequestHandler();

View File

@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11.instrumentor
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsSuppressReceiveSpansTest
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements LibraryTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client
}
}

View File

@ -11,8 +11,12 @@ import static java.util.Collections.singletonList;
import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpClientAttributesExtractor;
@ -20,8 +24,10 @@ import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperat
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcClientAttributesExtractor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
final class AwsSdkInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.aws-sdk-1.11";
@ -49,25 +55,91 @@ final class AwsSdkInstrumenterFactory {
captureExperimentalSpanAttributes,
spanName,
SpanKindExtractor.alwaysClient(),
emptyList());
emptyList(),
true);
}
static Instrumenter<Request<?>, Response<?>> consumerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
static Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
return sqsInstrumenter(
openTelemetry, MessageOperation.RECEIVE, captureExperimentalSpanAttributes);
openTelemetry,
MessageOperation.RECEIVE,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
}
static Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
MessageOperation operation = MessageOperation.PROCESS;
SqsProcessRequestAttributesGetter getter = SqsProcessRequestAttributesGetter.INSTANCE;
InstrumenterBuilder<SqsProcessRequest, Void> builder =
Instrumenter.<SqsProcessRequest, Void>builder(
openTelemetry,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, operation))
.addAttributesExtractors(
toProcessRequestExtractors(
captureExperimentalSpanAttributes
? extendedAttributesExtractors
: defaultAttributesExtractors))
.addAttributesExtractor(
MessagingAttributesExtractor.builder(getter, operation).build());
if (messagingReceiveInstrumentationEnabled) {
builder.addSpanLinksExtractor(
(spanLinks, parentContext, request) -> {
Context extracted =
SqsParentContext.ofSystemAttributes(request.getMessage().getAttributes());
spanLinks.addLink(Span.fromContext(extracted).getSpanContext());
});
}
return builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static List<AttributesExtractor<SqsProcessRequest, Void>> toProcessRequestExtractors(
List<AttributesExtractor<Request<?>, Response<?>>> extractors) {
List<AttributesExtractor<SqsProcessRequest, Void>> result = new ArrayList<>();
for (AttributesExtractor<Request<?>, Response<?>> extractor : extractors) {
result.add(
new AttributesExtractor<SqsProcessRequest, Void>() {
@Override
public void onStart(
AttributesBuilder attributes,
Context parentContext,
SqsProcessRequest sqsProcessRequest) {
extractor.onStart(attributes, parentContext, sqsProcessRequest.getRequest());
}
@Override
public void onEnd(
AttributesBuilder attributes,
Context context,
SqsProcessRequest sqsProcessRequest,
@Nullable Void unused,
@Nullable Throwable error) {
extractor.onEnd(attributes, context, sqsProcessRequest.getRequest(), null, error);
}
});
}
return result;
}
static Instrumenter<Request<?>, Response<?>> producerInstrumenter(
OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
return sqsInstrumenter(
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes);
openTelemetry, MessageOperation.PUBLISH, captureExperimentalSpanAttributes, true);
}
private static Instrumenter<Request<?>, Response<?>> sqsInstrumenter(
OpenTelemetry openTelemetry,
MessageOperation operation,
boolean captureExperimentalSpanAttributes) {
boolean captureExperimentalSpanAttributes,
boolean enabled) {
SqsAttributesGetter getter = SqsAttributesGetter.INSTANCE;
AttributesExtractor<Request<?>, Response<?>> messagingAttributeExtractor =
MessagingAttributesExtractor.builder(getter, operation).build();
@ -79,7 +151,8 @@ final class AwsSdkInstrumenterFactory {
operation == MessageOperation.PUBLISH
? SpanKindExtractor.alwaysProducer()
: SpanKindExtractor.alwaysConsumer(),
singletonList(messagingAttributeExtractor));
singletonList(messagingAttributeExtractor),
enabled);
}
private static Instrumenter<Request<?>, Response<?>> createInstrumenter(
@ -87,7 +160,8 @@ final class AwsSdkInstrumenterFactory {
boolean captureExperimentalSpanAttributes,
SpanNameExtractor<Request<?>> spanNameExtractor,
SpanKindExtractor<Request<?>> spanKindExtractor,
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors) {
List<AttributesExtractor<Request<?>, Response<?>>> additionalAttributeExtractors,
boolean enabled) {
return Instrumenter.<Request<?>, Response<?>>builder(
openTelemetry, INSTRUMENTATION_NAME, spanNameExtractor)
.addAttributesExtractors(
@ -95,6 +169,7 @@ final class AwsSdkInstrumenterFactory {
? extendedAttributesExtractors
: defaultAttributesExtractors)
.addAttributesExtractors(additionalAttributeExtractors)
.setEnabled(enabled)
.buildInstrumenter(spanKindExtractor);
}

View File

@ -45,16 +45,27 @@ public class AwsSdkTelemetry {
}
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
AwsSdkTelemetry(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
AwsSdkTelemetry(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean messagingReceiveInstrumentationEnabled) {
requestInstrumenter =
AwsSdkInstrumenterFactory.requestInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
consumerInstrumenter =
AwsSdkInstrumenterFactory.consumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
consumerReceiveInstrumenter =
AwsSdkInstrumenterFactory.consumerReceiveInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
consumerProcessInstrumenter =
AwsSdkInstrumenterFactory.consumerProcessInstrumenter(
openTelemetry,
captureExperimentalSpanAttributes,
messagingReceiveInstrumentationEnabled);
producerInstrumenter =
AwsSdkInstrumenterFactory.producerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes);
@ -66,6 +77,9 @@ public class AwsSdkTelemetry {
*/
public RequestHandler2 newRequestHandler() {
return new TracingRequestHandler(
requestInstrumenter, consumerInstrumenter, producerInstrumenter);
requestInstrumenter,
consumerReceiveInstrumenter,
consumerProcessInstrumenter,
producerInstrumenter);
}
}

View File

@ -14,6 +14,7 @@ public class AwsSdkTelemetryBuilder {
private final OpenTelemetry openTelemetry;
private boolean captureExperimentalSpanAttributes;
private boolean messagingReceiveInstrumentationEnabled;
AwsSdkTelemetryBuilder(OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
@ -31,10 +32,24 @@ public class AwsSdkTelemetryBuilder {
return this;
}
/**
* Set whether to capture the consumer message receive telemetry in messaging instrumentation.
*
* <p>Note that this will cause the consumer side to start a new trace, with only a span link
* connecting it to the producer trace.
*/
@CanIgnoreReturnValue
public AwsSdkTelemetryBuilder setMessagingReceiveInstrumentationEnabled(
boolean messagingReceiveInstrumentationEnabled) {
this.messagingReceiveInstrumentationEnabled = messagingReceiveInstrumentationEnabled;
return this;
}
/**
* Returns a new {@link AwsSdkTelemetry} with the settings of this {@link AwsSdkTelemetryBuilder}.
*/
public AwsSdkTelemetry build() {
return new AwsSdkTelemetry(openTelemetry, captureExperimentalSpanAttributes);
return new AwsSdkTelemetry(
openTelemetry, captureExperimentalSpanAttributes, messagingReceiveInstrumentationEnabled);
}
}

View File

@ -8,7 +8,8 @@ package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.internal.Timer;
import io.opentelemetry.javaagent.tooling.muzzle.NoMuzzle;
import java.util.Collections;
import java.util.Map;
@ -22,8 +23,11 @@ final class SqsAccess {
static boolean afterResponse(
Request<?> request,
Response<?> response,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter) {
return enabled && SqsImpl.afterResponse(request, response, consumerInstrumenter);
Timer timer,
Context parentContext,
TracingRequestHandler requestHandler) {
return enabled
&& SqsImpl.afterResponse(request, response, timer, parentContext, requestHandler);
}
@NoMuzzle

View File

@ -9,13 +9,15 @@ import com.amazonaws.AmazonWebServiceRequest;
import com.amazonaws.Request;
import com.amazonaws.Response;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import io.opentelemetry.instrumentation.api.internal.Timer;
import java.lang.reflect.Field;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@ -33,33 +35,81 @@ final class SqsImpl {
static boolean afterResponse(
Request<?> request,
Response<?> response,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter) {
Timer timer,
Context parentContext,
TracingRequestHandler requestHandler) {
if (response.getAwsResponse() instanceof ReceiveMessageResult) {
afterConsumerResponse(request, response, consumerInstrumenter);
afterConsumerResponse(request, response, timer, parentContext, requestHandler);
return true;
}
return false;
}
/** Create and close CONSUMER span for each message consumed. */
private static void afterConsumerResponse(
Request<?> request,
Response<?> response,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter) {
Timer timer,
Context parentContext,
TracingRequestHandler requestHandler) {
ReceiveMessageResult receiveMessageResult = (ReceiveMessageResult) response.getAwsResponse();
for (Message message : receiveMessageResult.getMessages()) {
createConsumerSpan(message, request, response, consumerInstrumenter);
if (receiveMessageResult.getMessages().isEmpty()) {
return;
}
Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter =
requestHandler.getConsumerReceiveInstrumenter();
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter =
requestHandler.getConsumerProcessInstrumenter();
Context receiveContext = null;
if (timer != null && consumerReceiveInstrumenter.shouldStart(parentContext, request)) {
receiveContext =
InstrumenterUtil.startAndEnd(
consumerReceiveInstrumenter,
parentContext,
request,
response,
null,
timer.startTime(),
timer.now());
}
addTracing(receiveMessageResult, request, consumerProcessInstrumenter, receiveContext);
}
private static final Field messagesField = getMessagesField();
private static Field getMessagesField() {
try {
Field field = ReceiveMessageResult.class.getDeclaredField("messages");
field.setAccessible(true);
return field;
} catch (Exception e) {
return null;
}
}
private static void createConsumerSpan(
Message message,
private static void addTracing(
ReceiveMessageResult receiveMessageResult,
Request<?> request,
Response<?> response,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter) {
Context parentContext = SqsParentContext.ofSystemAttributes(message.getAttributes());
Context context = consumerInstrumenter.start(parentContext, request);
consumerInstrumenter.end(context, request, response, null);
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Context receiveContext) {
if (messagesField == null) {
return;
}
// replace Messages list inside ReceiveMessageResult with a tracing list that creates process
// spans as the list is iterated
try {
messagesField.set(
receiveMessageResult,
TracingList.wrap(
receiveMessageResult.getMessages(),
consumerProcessInstrumenter,
request,
receiveContext));
} catch (IllegalAccessException ignored) {
// should not happen, we call setAccessible on the field
}
}
static boolean beforeMarshalling(AmazonWebServiceRequest rawRequest) {

View File

@ -0,0 +1,17 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import java.util.Map;
/**
* A wrapper interface for {@link com.amazonaws.services.sqs.model.Message}. Using this wrapper
* avoids muzzle failure when sqs classes are not present.
*/
interface SqsMessage {
Map<String, String> getAttributes();
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.services.sqs.model.Message;
import java.util.Map;
final class SqsMessageImpl implements SqsMessage {
private final Message message;
private SqsMessageImpl(Message message) {
this.message = message;
}
static SqsMessage wrap(Message message) {
return new SqsMessageImpl(message);
}
@Override
public Map<String, String> getAttributes() {
return message.getAttributes();
}
}

View File

@ -0,0 +1,30 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
final class SqsProcessRequest {
private final Request<?> request;
private final SqsMessage message;
private SqsProcessRequest(Request<?> request, SqsMessage message) {
this.request = request;
this.message = message;
}
public static SqsProcessRequest create(Request<?> request, SqsMessage message) {
return new SqsProcessRequest(request, message);
}
public Request<?> getRequest() {
return request;
}
public SqsMessage getMessage() {
return message;
}
}

View File

@ -0,0 +1,64 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
enum SqsProcessRequestAttributesGetter
implements MessagingAttributesGetter<SqsProcessRequest, Void> {
INSTANCE;
@Override
public String getSystem(SqsProcessRequest request) {
return "AmazonSQS";
}
@Override
public String getDestination(SqsProcessRequest request) {
Object originalRequest = request.getRequest().getOriginalRequest();
String queueUrl = RequestAccess.getQueueUrl(originalRequest);
int i = queueUrl.lastIndexOf('/');
return i > 0 ? queueUrl.substring(i + 1) : null;
}
@Override
public boolean isTemporaryDestination(SqsProcessRequest request) {
return false;
}
@Override
@Nullable
public String getConversationId(SqsProcessRequest request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadSize(SqsProcessRequest request) {
return null;
}
@Override
@Nullable
public Long getMessagePayloadCompressedSize(SqsProcessRequest request) {
return null;
}
@Override
@Nullable
public String getMessageId(SqsProcessRequest request, @Nullable Void response) {
return null;
}
@Override
public List<String> getMessageHeader(SqsProcessRequest request, String name) {
String value = SqsAccess.getMessageAttributes(request.getRequest()).get(name);
return value != null ? Collections.singletonList(value) : Collections.emptyList();
}
}

View File

@ -0,0 +1,94 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
import com.amazonaws.services.sqs.model.Message;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Iterator;
import javax.annotation.Nullable;
class TracingIterator implements Iterator<Message> {
private final Iterator<Message> delegateIterator;
private final Instrumenter<SqsProcessRequest, Void> instrumenter;
private final Request<?> request;
private final Context receiveContext;
/*
* Note: this may potentially create problems if this iterator is used from different threads. But
* at the moment we cannot do much about this.
*/
@Nullable private SqsProcessRequest currentRequest;
@Nullable private Context currentContext;
@Nullable private Scope currentScope;
private TracingIterator(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Request<?> request,
Context receiveContext) {
this.delegateIterator = delegateIterator;
this.instrumenter = instrumenter;
this.request = request;
this.receiveContext = receiveContext;
}
public static Iterator<Message> wrap(
Iterator<Message> delegateIterator,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Request<?> request,
Context receiveContext) {
return new TracingIterator(delegateIterator, instrumenter, request, receiveContext);
}
@Override
public boolean hasNext() {
closeScopeAndEndSpan();
return delegateIterator.hasNext();
}
@Override
public Message next() {
// in case they didn't call hasNext()...
closeScopeAndEndSpan();
// it's important not to suppress consumer span creation here using Instrumenter.shouldStart()
// because this instrumentation can leak the context and so there may be a leaked consumer span
// in the context, in which case it's important to overwrite the leaked span instead of
// suppressing the correct span
// (https://github.com/open-telemetry/opentelemetry-java-instrumentation/issues/1947)
Message next = delegateIterator.next();
if (next != null) {
Context parentContext = receiveContext;
if (parentContext == null) {
parentContext = SqsParentContext.ofSystemAttributes(next.getAttributes());
}
currentRequest = SqsProcessRequest.create(request, SqsMessageImpl.wrap(next));
currentContext = instrumenter.start(parentContext, currentRequest);
currentScope = currentContext.makeCurrent();
}
return next;
}
private void closeScopeAndEndSpan() {
if (currentScope != null) {
currentScope.close();
instrumenter.end(currentContext, currentRequest, null, null);
currentScope = null;
currentRequest = null;
currentContext = null;
}
}
@Override
public void remove() {
delegateIterator.remove();
}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import com.amazonaws.Request;
import com.amazonaws.internal.SdkInternalList;
import com.amazonaws.services.sqs.AmazonSQSClient;
import com.amazonaws.services.sqs.model.Message;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.Iterator;
import java.util.List;
class TracingList extends SdkInternalList<Message> {
private static final long serialVersionUID = 1L;
private final transient Instrumenter<SqsProcessRequest, Void> instrumenter;
private final transient Request<?> request;
private final transient Context receiveContext;
private boolean firstIterator = true;
private TracingList(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Request<?> request,
Context receiveContext) {
super(list);
this.instrumenter = instrumenter;
this.request = request;
this.receiveContext = receiveContext;
}
public static SdkInternalList<Message> wrap(
List<Message> list,
Instrumenter<SqsProcessRequest, Void> instrumenter,
Request<?> request,
Context receiveContext) {
return new TracingList(list, instrumenter, request, receiveContext);
}
@Override
public Iterator<Message> iterator() {
Iterator<Message> it;
// We should only return one iterator with tracing.
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// List is performed in the same thread that called receiveMessage()
if (firstIterator && !inAwsClient()) {
it = TracingIterator.wrap(super.iterator(), instrumenter, request, receiveContext);
firstIterator = false;
} else {
it = super.iterator();
}
return it;
}
private static boolean inAwsClient() {
for (Class<?> caller : CallerClass.INSTANCE.getClassContext()) {
if (AmazonSQSClient.class == caller) {
return true;
}
}
return false;
}
private Object writeReplace() {
// serialize this object to SdkInternalList
return new SdkInternalList<>(this);
}
private static class CallerClass extends SecurityManager {
public static final CallerClass INSTANCE = new CallerClass();
@Override
public Class<?>[] getClassContext() {
return super.getClassContext();
}
}
}

View File

@ -11,12 +11,13 @@ import com.amazonaws.Response;
import com.amazonaws.handlers.HandlerContextKey;
import com.amazonaws.handlers.RequestHandler2;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.ContextKey;
import io.opentelemetry.contrib.awsxray.propagator.AwsXrayPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.internal.InstrumenterUtil;
import java.time.Instant;
import io.opentelemetry.instrumentation.api.internal.Timer;
import javax.annotation.Nullable;
/** Tracing Request Handler. */
@ -24,21 +25,26 @@ final class TracingRequestHandler extends RequestHandler2 {
static final HandlerContextKey<Context> CONTEXT =
new HandlerContextKey<>(Context.class.getName());
private static final ContextKey<Instant> REQUEST_START_KEY =
ContextKey.named(TracingRequestHandler.class.getName() + ".RequestStart");
private static final ContextKey<Context> PARENT_CONTEXT_KEY =
ContextKey.named(TracingRequestHandler.class.getName() + ".ParentContext");
private static final ContextKey<Timer> REQUEST_TIMER_KEY =
ContextKey.named(TracingRequestHandler.class.getName() + ".Timer");
private static final ContextKey<Boolean> REQUEST_SPAN_SUPPRESSED_KEY =
ContextKey.named(TracingRequestHandler.class.getName() + ".RequestSpanSuppressed");
private final Instrumenter<Request<?>, Response<?>> requestInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerInstrumenter;
private final Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter;
private final Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter;
private final Instrumenter<Request<?>, Response<?>> producerInstrumenter;
TracingRequestHandler(
Instrumenter<Request<?>, Response<?>> requestInstrumenter,
Instrumenter<Request<?>, Response<?>> consumerInstrumenter,
Instrumenter<Request<?>, Response<?>> consumerReceiveInstrumenter,
Instrumenter<SqsProcessRequest, Void> consumerProcessInstrumenter,
Instrumenter<Request<?>, Response<?>> producerInstrumenter) {
this.requestInstrumenter = requestInstrumenter;
this.consumerInstrumenter = consumerInstrumenter;
this.consumerReceiveInstrumenter = consumerReceiveInstrumenter;
this.consumerProcessInstrumenter = consumerProcessInstrumenter;
this.producerInstrumenter = producerInstrumenter;
}
@ -64,17 +70,20 @@ final class TracingRequestHandler extends RequestHandler2 {
// also suppress the span from the underlying http client. Request/http client span appears in a
// separate trace from message producer/consumer spans if there is no parent span just having
// a trace with only the request/http client span isn't useful.
if (Context.root() == parentContext
if (Span.fromContextOrNull(parentContext) == null
&& "com.amazonaws.services.sqs.model.ReceiveMessageRequest"
.equals(request.getOriginalRequest().getClass().getName())) {
Context context = InstrumenterUtil.suppressSpan(instrumenter, parentContext, request);
context = context.with(REQUEST_START_KEY, Instant.now());
context = context.with(REQUEST_TIMER_KEY, Timer.start());
context = context.with(PARENT_CONTEXT_KEY, parentContext);
context = context.with(REQUEST_SPAN_SUPPRESSED_KEY, Boolean.TRUE);
request.addHandlerContext(CONTEXT, context);
return;
}
Context context = instrumenter.start(parentContext, request);
context = context.with(REQUEST_TIMER_KEY, Timer.start());
context = context.with(PARENT_CONTEXT_KEY, parentContext);
AwsXrayPropagator.getInstance().inject(context, request, HeaderSetter.INSTANCE);
@ -90,9 +99,26 @@ final class TracingRequestHandler extends RequestHandler2 {
return request;
}
Instrumenter<Request<?>, Response<?>> getConsumerReceiveInstrumenter() {
return consumerReceiveInstrumenter;
}
Instrumenter<SqsProcessRequest, Void> getConsumerProcessInstrumenter() {
return consumerProcessInstrumenter;
}
@Override
public void afterResponse(Request<?> request, Response<?> response) {
SqsAccess.afterResponse(request, response, consumerInstrumenter);
Context context = request.getHandlerContext(CONTEXT);
if (context == null) {
return;
}
Timer timer = context.get(REQUEST_TIMER_KEY);
// javaagent instrumentation activates scope for the request span, we need to use the context
// we stored before creating the request span to avoid making request span the parent of the
// sqs receive span
Context parentContext = context.get(PARENT_CONTEXT_KEY);
SqsAccess.afterResponse(request, response, timer, parentContext, this);
finish(request, response, null);
}
@ -111,15 +137,18 @@ final class TracingRequestHandler extends RequestHandler2 {
Instrumenter<Request<?>, Response<?>> instrumenter = getInstrumenter(request);
// see beforeRequest, requestStart is only set when we skip creating request span for sqs
// see beforeRequest, request suppressed is only set when we skip creating request span for sqs
// AmazonSQSClient.receiveMessage calls
Instant requestStart = context.get(REQUEST_START_KEY);
if (requestStart != null) {
if (Boolean.TRUE.equals(context.get(REQUEST_SPAN_SUPPRESSED_KEY))) {
Context parentContext = context.get(PARENT_CONTEXT_KEY);
Timer timer = context.get(REQUEST_TIMER_KEY);
// create request span if there was an error
if (error != null && requestInstrumenter.shouldStart(parentContext, request)) {
if (error != null
&& parentContext != null
&& timer != null
&& requestInstrumenter.shouldStart(parentContext, request)) {
InstrumenterUtil.startAndEnd(
instrumenter, parentContext, request, response, error, requestStart, Instant.now());
instrumenter, parentContext, request, response, error, timer.startTime(), timer.now());
}
return;
}

View File

@ -0,0 +1,20 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SqsSuppressReceiveSpansTest extends AbstractSqsSuppressReceiveSpansTest implements LibraryTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client.withRequestHandlers(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.build()
.newRequestHandler())
}
}

View File

@ -14,6 +14,7 @@ class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait
return client.withRequestHandlers(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setMessagingReceiveInstrumentationEnabled(true)
.build()
.newRequestHandler())
}

View File

@ -0,0 +1,283 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.semconv.SemanticAttributes
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
abstract class AbstractSqsSuppressReceiveSpansTest extends InstrumentationSpecification {
abstract AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client)
@Shared
def sqs
@Shared
AmazonSQSAsyncClient client
@Shared
int sqsPort
def setupSpec() {
sqsPort = PortUtils.findOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"
def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:" + sqsPort, "elasticmq")
client = configureClient(AmazonSQSAsyncClient.asyncBuilder()).withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build()
}
def cleanupSpec() {
if (sqs != null) {
sqs.stopAndWait()
}
}
def "simple sqs producer-consumer services"() {
setup:
client.createQueue("testSdkSqs")
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send)
def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
receiveMessageResult.messages.each {message -> runWithSpan("process child") {}}
then:
assertTraces(2) {
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.name" "testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"rpc.method" "CreateQueue"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
}
trace(1, 3) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.method" "SendMessage"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
name "testSdkSqs process"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
}
}
def "simple sqs producer-consumer services with parent span"() {
setup:
client.createQueue("testSdkSqs")
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send)
runWithSpan("parent") {
def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
receiveMessageResult.messages.each {message -> runWithSpan("process child") {}}
}
then:
assertTraces(3) {
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.name" "testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"rpc.method" "CreateQueue"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
}
trace(1, 3) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.method" "SendMessage"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
name "testSdkSqs process"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
/**
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
*/
trace(2, 2) {
span(0) {
name "parent"
hasNoParent()
}
span(1) {
name "SQS.ReceiveMessage"
kind CLIENT
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
}
}
}
def "only adds attribute name once when request reused"() {
setup:
client.createQueue("testSdkSqs2")
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2", "{\"type\": \"hello\"}")
client.sendMessage(send)
ReceiveMessageRequest receive = new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2")
client.receiveMessage(receive)
client.sendMessage(send)
client.receiveMessage(receive)
then:
receive.getAttributeNames() == ["AWSTraceHeader"]
}
}

View File

@ -14,6 +14,7 @@ import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.SemanticAttributes
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import spock.lang.Shared
@ -57,10 +58,14 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send)
client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
receiveMessageResult.messages.each {message ->
runWithSpan("process child") {}
}
then:
assertTraces(2) {
assertTraces(3) {
SpanData publishSpan
trace(0, 1) {
span(0) {
@ -85,7 +90,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
}
}
}
trace(1, 2) {
trace(1, 1) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
@ -110,10 +115,13 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
publishSpan = span(0)
}
trace(2, 3) {
span(0) {
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
@ -134,6 +142,33 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
name "testSdkSqs process"
kind CONSUMER
childOf span(0)
hasLink(publishSpan)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
}
}
@ -146,11 +181,13 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send)
runWithSpan("parent") {
client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
receiveMessageResult.messages.each {message -> runWithSpan("process child") {}}
}
then:
assertTraces(3) {
SpanData publishSpan
trace(0, 1) {
span(0) {
@ -175,7 +212,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
}
}
}
trace(1, 2) {
trace(1, 1) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
@ -200,36 +237,9 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(1) {
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
publishSpan = span(0)
}
/**
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
*/
trace(2, 2) {
trace(2, 5) {
span(0) {
name "parent"
hasNoParent()
@ -255,6 +265,57 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(2) {
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.status_code" 200
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
}
}
span(3) {
name "testSdkSqs process"
kind CONSUMER
childOf span(2)
hasLink(publishSpan)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"http.method" "POST"
"http.url" "http://localhost:$sqsPort"
"net.peer.name" "localhost"
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
}
span(4) {
name "process child"
childOf span(3)
attributes {
}
}
}
}
}

View File

@ -20,8 +20,10 @@ import com.amazonaws.services.sns.AmazonSNSAsyncClient;
import com.amazonaws.services.sns.model.CreateTopicResult;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.model.GetQueueAttributesRequest;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.PurgeQueueRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import java.util.Collections;
@ -146,7 +148,9 @@ class AwsConnector {
void receiveMessage(String queueUrl) {
logger.info("Receive message from queue {}", queueUrl);
sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20));
ReceiveMessageResult receiveMessageResult =
sqsClient.receiveMessage(new ReceiveMessageRequest(queueUrl).withWaitTimeSeconds(20));
for (Message ignored : receiveMessageResult.getMessages()) {}
}
void disconnect() {

View File

@ -44,7 +44,7 @@ class AwsSpanAssertions {
String rpcMethod;
if (spanName.startsWith("SQS.")) {
rpcMethod = spanName.substring(4);
} else if (spanName.endsWith("receive")) {
} else if (spanName.endsWith("process")) {
rpcMethod = "ReceiveMessage";
} else if (spanName.endsWith("publish")) {
rpcMethod = "SendMessage";
@ -57,7 +57,6 @@ class AwsSpanAssertions {
Arrays.asList(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
satisfies(stringKey("aws.endpoint"), val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
satisfies(
stringKey("aws.queue.name"),
val ->
@ -69,11 +68,6 @@ class AwsSpanAssertions {
val.satisfiesAnyOf(
v -> assertThat(v).isEqualTo(queueUrl), v -> assertThat(v).isNull())),
equalTo(SemanticAttributes.HTTP_METHOD, "POST"),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(), v -> assertThat(v).isInstanceOf(Long.class))),
satisfies(
SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH,
val ->
@ -89,19 +83,34 @@ class AwsSpanAssertions {
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Number.class))),
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1"),
equalTo(stringKey("rpc.system"), "aws-api"),
satisfies(stringKey("rpc.method"), stringAssert -> stringAssert.isEqualTo(rpcMethod)),
equalTo(stringKey("rpc.service"), "AmazonSQS")));
if (spanName.endsWith("receive") || spanName.endsWith("publish")) {
if (!spanName.endsWith("process")) {
attributeAssertions.addAll(
Arrays.asList(
equalTo(SemanticAttributes.HTTP_STATUS_CODE, 200),
satisfies(
SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isNull(),
v -> assertThat(v).isInstanceOf(Long.class))),
equalTo(SemanticAttributes.NET_PROTOCOL_NAME, "http"),
equalTo(SemanticAttributes.NET_PROTOCOL_VERSION, "1.1")));
}
if (spanName.endsWith("receive")
|| spanName.endsWith("process")
|| spanName.endsWith("publish")) {
attributeAssertions.addAll(
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, queueName),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS")));
if (spanName.endsWith("receive")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
} else if (spanName.endsWith("process")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"));
} else if (spanName.endsWith("publish")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"));
}

View File

@ -60,7 +60,7 @@ class SqsCamelTest {
.hasParent(trace.getSpan(1)),
span ->
AwsSpanAssertions.sqs(
span, "sqsCamelTest receive", queueUrl, queueName, SpanKind.CONSUMER)
span, "sqsCamelTest process", queueUrl, queueName, SpanKind.CONSUMER)
.hasParent(trace.getSpan(2)),
span ->
CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(2))),
@ -95,7 +95,7 @@ class SqsCamelTest {
.hasNoParent(),
span ->
AwsSpanAssertions.sqs(
span, "sqsCamelTest receive", queueUrl, queueName, SpanKind.CONSUMER)
span, "sqsCamelTest process", queueUrl, queueName, SpanKind.CONSUMER)
.hasParent(trace.getSpan(0)),
span ->
CamelSpanAssertions.sqsConsume(span, queueName).hasParent(trace.getSpan(0))),
@ -137,7 +137,7 @@ class SqsCamelTest {
span ->
AwsSpanAssertions.sqs(
span,
"sqsCamelTestSdkConsumer receive",
"sqsCamelTestSdkConsumer process",
queueUrl,
queueName,
SpanKind.CONSUMER)