Convert aws sdk 2.2 Sqs Suppress Receive Spans Tests (#12895)

This commit is contained in:
Jay DeLuca 2024-12-18 14:06:24 -05:00 committed by GitHub
parent 47ada968e8
commit 4f697dc7fe
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
14 changed files with 654 additions and 796 deletions

View File

@ -1,27 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2SqsSuppressReceiveSpansTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsClient
class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest implements AgentTestTrait {
@Override
ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
return ClientOverrideConfiguration.builder()
}
@Override
SqsClient configureSqsClient(SqsClient sqsClient) {
return sqsClient
}
@Override
SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
return sqsClient
}
}

View File

@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v2_2;
import io.opentelemetry.instrumentation.awssdk.v2_2.AbstractAws2SqsSuppressReceiveSpansTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsClient;
class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest {
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension getTesting() {
return testing;
}
@Override
protected SqsClient configureSqsClient(SqsClient sqsClient) {
return sqsClient;
}
@Override
protected SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
return sqsClient;
}
@Override
protected ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
return ClientOverrideConfiguration.builder();
}
}

View File

@ -1,109 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsClient
abstract class Aws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsSuppressReceiveSpansTest implements LibraryTestTrait {
static AwsSdkTelemetry telemetry
def setupSpec() {
def telemetryBuilder = AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
configure(telemetryBuilder)
telemetry = telemetryBuilder.build()
}
abstract void configure(AwsSdkTelemetryBuilder telemetryBuilder)
@Override
ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
return ClientOverrideConfiguration.builder()
.addExecutionInterceptor(
telemetry.newExecutionInterceptor())
}
@Override
SqsClient configureSqsClient(SqsClient sqsClient) {
return telemetry.wrap(sqsClient)
}
@Override
SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
return telemetry.wrap(sqsClient)
}
}
class Aws2SqsSuppressReceiveSpansDefaultPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
@Override
void configure(AwsSdkTelemetryBuilder telemetryBuilder) {}
@Override
boolean isSqsAttributeInjectionEnabled() {
false
}
def "duplicate tracing interceptor"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def overrideConfiguration = ClientOverrideConfiguration.builder()
.addExecutionInterceptor(telemetry.newExecutionInterceptor())
.addExecutionInterceptor(telemetry.newExecutionInterceptor())
.build()
builder.overrideConfiguration(overrideConfiguration)
def client = configureSqsClient(builder.build())
client.createQueue(createQueueRequest)
when:
client.sendMessage(sendMessageRequest)
def resp = client.receiveMessage(receiveMessageRequest)
then:
resp.messages().size() == 1
resp.messages.each {message -> runWithSpan("process child") {}}
assertSqsTraces()
}
}
class Aws2SqsSuppressReceiveSpansW3CPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
@Override
void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
.setUseXrayPropagator(isXrayInjectionEnabled()) // Disable to confirm messaging propagator actually works
}
@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
@Override
boolean isXrayInjectionEnabled() {
false
}
}
/** We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable W3C. */
class Aws2SqsSuppressReceiveSpansW3CPropagatorAndXrayPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
@Override
void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
telemetryBuilder.setUseConfiguredPropagatorForMessaging(isSqsAttributeInjectionEnabled()) // Difference to main test
}
@Override
boolean isSqsAttributeInjectionEnabled() {
true
}
}

View File

@ -20,7 +20,7 @@ class Aws2SqsDefaultPropagatorTest extends Aws2SqsTracingTest {
void configure(AwsSdkTelemetryBuilder telemetryBuilder) {}
@Override
boolean isSqsAttributeInjectionEnabled() {
protected boolean isSqsAttributeInjectionEnabled() {
return false;
}

View File

@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static org.assertj.core.api.Assertions.assertThat;
import java.net.URISyntaxException;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
class Aws2SqsSuppressReceiveSpansDefaultPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
@Override
protected void configure(AwsSdkTelemetryBuilder telemetryBuilder) {}
@Override
protected boolean isSqsAttributeInjectionEnabled() {
return false;
}
@Test
void testDuplicateTracingInterceptor() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
configureSdkClient(builder);
ClientOverrideConfiguration overrideConfiguration =
ClientOverrideConfiguration.builder()
.addExecutionInterceptor(telemetry.newExecutionInterceptor())
.addExecutionInterceptor(telemetry.newExecutionInterceptor())
.build();
builder.overrideConfiguration(overrideConfiguration);
SqsClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest);
client.sendMessage(sendMessageRequest);
ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest);
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(false, false);
}
}

View File

@ -0,0 +1,54 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsClient;
public abstract class Aws2SqsSuppressReceiveSpansTest
extends AbstractAws2SqsSuppressReceiveSpansTest {
protected AwsSdkTelemetry telemetry;
@RegisterExtension
static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
protected InstrumentationExtension getTesting() {
return testing;
}
@Override
protected SqsClient configureSqsClient(SqsClient sqsClient) {
return telemetry.wrap(sqsClient);
}
@Override
protected SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient) {
return telemetry.wrap(sqsClient);
}
@Override
protected ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder() {
return ClientOverrideConfiguration.builder()
.addExecutionInterceptor(telemetry.newExecutionInterceptor());
}
protected abstract void configure(AwsSdkTelemetryBuilder telemetryBuilder);
@BeforeEach
void setup() {
AwsSdkTelemetryBuilder telemetryBuilder =
AwsSdkTelemetry.builder(getTesting().getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true);
configure(telemetryBuilder);
telemetry = telemetryBuilder.build();
}
}

View File

@ -0,0 +1,25 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
/**
* We want to test the combination of W3C + Xray, as that's what you'll get in prod if you enable
* W3C.
*/
class Aws2SqsSuppressReceiveSpansW3cPropagatorAndXrayPropagatorTest
extends Aws2SqsSuppressReceiveSpansTest {
@Override
protected void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
telemetryBuilder.setUseConfiguredPropagatorForMessaging(
isSqsAttributeInjectionEnabled()); // Difference to main test
}
@Override
protected boolean isSqsAttributeInjectionEnabled() {
return true;
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
class Aws2SqsSuppressReceiveSpansW3cPropagatorTest extends Aws2SqsSuppressReceiveSpansTest {
@Override
protected void configure(AwsSdkTelemetryBuilder telemetryBuilder) {
telemetryBuilder
.setUseConfiguredPropagatorForMessaging(
isSqsAttributeInjectionEnabled()) // Difference to main test
.setUseXrayPropagator(
isXrayInjectionEnabled()); // Disable to confirm messaging propagator actually works
}
@Override
protected boolean isSqsAttributeInjectionEnabled() {
return true;
}
@Override
protected boolean isXrayInjectionEnabled() {
return false;
}
}

View File

@ -13,7 +13,7 @@ class Aws2SqsW3cPropagatorAndXrayPropagatorTest extends Aws2SqsTracingTest {
}
@Override
boolean isSqsAttributeInjectionEnabled() {
protected boolean isSqsAttributeInjectionEnabled() {
return true;
}
}

View File

@ -17,12 +17,12 @@ class Aws2SqsW3cPropagatorTest extends Aws2SqsTracingTest {
}
@Override
boolean isSqsAttributeInjectionEnabled() {
protected boolean isSqsAttributeInjectionEnabled() {
return true;
}
@Override
boolean isXrayInjectionEnabled() {
protected boolean isXrayInjectionEnabled() {
return false;
}
}

View File

@ -1,393 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes
import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes
import io.opentelemetry.semconv.ServerAttributes
import io.opentelemetry.semconv.HttpAttributes
import io.opentelemetry.semconv.UrlAttributes
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration
import software.amazon.awssdk.regions.Region
import software.amazon.awssdk.services.sqs.SqsAsyncClient
import software.amazon.awssdk.services.sqs.SqsBaseClientBuilder
import software.amazon.awssdk.services.sqs.SqsClient
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest
import software.amazon.awssdk.services.sqs.model.SendMessageRequest
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
abstract class AbstractAws2SqsSuppressReceiveSpansTest extends InstrumentationSpecification {
private static final StaticCredentialsProvider CREDENTIALS_PROVIDER = StaticCredentialsProvider
.create(AwsBasicCredentials.create("my-access-key", "my-secret-key"))
@Shared
def sqs
@Shared
int sqsPort
static Map<String, MessageAttributeValue> dummyMessageAttributes(count) {
(0..<count).collectEntries {
[
"a$it".toString(),
MessageAttributeValue.builder().stringValue("v$it").dataType("String").build()]
}
}
String queueUrl = "http://localhost:$sqsPort/000000000000/testSdkSqs"
ReceiveMessageRequest receiveMessageRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.build()
ReceiveMessageRequest receiveMessageBatchRequest = ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(3)
.messageAttributeNames("All")
.waitTimeSeconds(5)
.build()
CreateQueueRequest createQueueRequest = CreateQueueRequest.builder()
.queueName("testSdkSqs")
.build()
SendMessageRequest sendMessageRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody("{\"type\": \"hello\"}")
.build()
SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(
e -> e.messageBody("e1").id("i1"),
// 8 attributes, injection always possible
e -> e.messageBody("e2").id("i2")
.messageAttributes(dummyMessageAttributes(8)),
// 10 attributes, injection with custom propagator never possible
e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10)))
.build()
boolean isSqsAttributeInjectionEnabled() {
AbstractAws2ClientCoreTest.isSqsAttributeInjectionEnabled()
}
boolean isXrayInjectionEnabled() {
true
}
void configureSdkClient(SqsBaseClientBuilder builder) {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(new URI("http://localhost:" + sqsPort))
builder
.region(Region.AP_NORTHEAST_1)
.credentialsProvider(CREDENTIALS_PROVIDER)
}
abstract SqsClient configureSqsClient(SqsClient sqsClient)
abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient)
abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder()
def setupSpec() {
sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start()
def server = sqs.waitUntilStarted()
sqsPort = server.localAddress().port
println getClass().name + " SQS server started at: localhost:$sqsPort/"
}
def cleanupSpec() {
if (sqs != null) {
sqs.stopAndWait()
}
}
void assertSqsTraces(withParent = false) {
assertTraces(2 + (withParent ? 1 : 0)) {
trace(0, 1) {
span(0) {
name "Sqs.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.queue.name" "testSdkSqs"
"$AwsIncubatingAttributes.AWS_REQUEST_ID" { it == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" }
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"rpc.method" "CreateQueue"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
}
}
}
trace(1, 3) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"$AwsIncubatingAttributes.AWS_REQUEST_ID" { it == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" }
"rpc.system" "aws-api"
"rpc.method" "SendMessage"
"rpc.service" "Sqs"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
}
}
span(1) {
name "testSdkSqs process"
kind CONSUMER
childOf span(0)
hasNoLinks()
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
if (withParent) {
/**
* This span represents HTTP "sending of receive message" operation. It's always single, while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then HTTP instrumentation span would appear
*/
trace(2, 2) {
span(0) {
name "parent"
hasNoParent()
}
span(1) {
name "Sqs.ReceiveMessage"
kind CLIENT
childOf span(0)
hasNoLinks()
attributes {
"aws.agent" "java-aws-sdk"
"$AwsIncubatingAttributes.AWS_REQUEST_ID" { it == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" }
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
}
}
}
}
}
}
def "simple sqs producer-consumer services: sync"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def client = configureSqsClient(builder.build())
client.createQueue(createQueueRequest)
when:
client.sendMessage(sendMessageRequest)
def resp = client.receiveMessage(receiveMessageRequest)
then:
resp.messages.size() == 1
resp.messages.each {message -> runWithSpan("process child") {}}
assertSqsTraces()
}
def "simple sqs producer-consumer services with parent: sync"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def client = configureSqsClient(builder.build())
client.createQueue(createQueueRequest)
when:
client.sendMessage(sendMessageRequest)
def resp = runWithSpan("parent") {
client.receiveMessage(receiveMessageRequest)
}
then:
resp.messages.size() == 1
resp.messages.each {message -> runWithSpan("process child") {}}
assertSqsTraces(true)
}
def "simple sqs producer-consumer services: async"() {
setup:
def builder = SqsAsyncClient.builder()
configureSdkClient(builder)
def client = configureSqsClient(builder.build())
client.createQueue(createQueueRequest).get()
when:
client.sendMessage(sendMessageRequest).get()
def resp = client.receiveMessage(receiveMessageRequest).get()
then:
resp.messages.size() == 1
resp.messages.each {message -> runWithSpan("process child") {}}
assertSqsTraces()
}
def "batch sqs producer-consumer services: sync"() {
setup:
def builder = SqsClient.builder()
configureSdkClient(builder)
def client = configureSqsClient(builder.build())
client.createQueue(createQueueRequest)
when:
client.sendMessageBatch(sendMessageBatchRequest)
def resp = client.receiveMessage(receiveMessageBatchRequest)
def totalAttrs = resp.messages().sum {it.messageAttributes().size() }
then:
resp.messages().size() == 3
// +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs
totalAttrs == 18 + (sqsAttributeInjectionEnabled ? 2 : 0)
assertTraces(xrayInjectionEnabled ? 2 : 3) {
trace(0, 1) {
span(0) {
name "Sqs.CreateQueue"
kind CLIENT
}
}
trace(1, xrayInjectionEnabled ? 4 : 3) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"$AwsIncubatingAttributes.AWS_REQUEST_ID" { it.trim() == "00000000-0000-0000-0000-000000000000" || it == "UNKNOWN" }
"rpc.system" "aws-api"
"rpc.method" "SendMessageBatch"
"rpc.service" "Sqs"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
}
}
for (int i: 1..(xrayInjectionEnabled ? 3 : 2)) {
span(i) {
name "testSdkSqs process"
kind CONSUMER
childOf span(0)
hasNoLinks()
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
}
}
}
}
if (!xrayInjectionEnabled) {
trace(2, 1) {
span(0) {
name "testSdkSqs process"
kind CONSUMER
// TODO This is not nice at all, and can also happen if producer is not instrumented
hasNoParent()
hasNoLinks()
attributes {
"aws.agent" "java-aws-sdk"
"rpc.method" "ReceiveMessage"
"rpc.system" "aws-api"
"rpc.service" "Sqs"
"$HttpAttributes.HTTP_REQUEST_METHOD" "POST"
"$HttpAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$UrlAttributes.URL_FULL" { it.startsWith("http://localhost:$sqsPort") }
"$ServerAttributes.SERVER_ADDRESS" "localhost"
"$ServerAttributes.SERVER_PORT" sqsPort
"$MessagingIncubatingAttributes.MESSAGING_SYSTEM" MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID" String
}
}
}
}
}
}
}

View File

@ -0,0 +1,275 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.UrlAttributes.URL_FULL;
import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.trace.data.SpanData;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import org.apache.pekko.http.scaladsl.Http;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
public abstract class AbstractAws2SqsBaseTest {
protected static final StaticCredentialsProvider CREDENTIALS_PROVIDER =
StaticCredentialsProvider.create(
AwsBasicCredentials.create("my-access-key", "my-secret-key"));
protected static int sqsPort;
protected static SQSRestServer sqs;
protected final String queueUrl = "http://localhost:" + sqsPort + "/000000000000/testSdkSqs";
protected ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder().queueUrl(queueUrl).build();
protected ReceiveMessageRequest receiveMessageBatchRequest =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(3)
.messageAttributeNames("All")
.waitTimeSeconds(5)
.build();
protected CreateQueueRequest createQueueRequest =
CreateQueueRequest.builder().queueName("testSdkSqs").build();
protected SendMessageRequest sendMessageRequest =
SendMessageRequest.builder().queueUrl(queueUrl).messageBody("{\"type\": \"hello\"}").build();
@SuppressWarnings("unchecked")
protected SendMessageBatchRequest sendMessageBatchRequest =
SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(
e -> e.messageBody("e1").id("i1"),
// 8 attributes, injection always possible
e -> e.messageBody("e2").id("i2").messageAttributes(dummyMessageAttributes(8)),
// 10 attributes, injection with custom propagator never possible
e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10)))
.build();
protected abstract InstrumentationExtension getTesting();
protected abstract SqsClient configureSqsClient(SqsClient sqsClient);
protected abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient);
protected abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder();
protected abstract void assertSqsTraces(boolean withParent, boolean captureHeaders);
static Map<String, MessageAttributeValue> dummyMessageAttributes(int count) {
Map<String, MessageAttributeValue> map = new HashMap<>();
for (int i = 0; i < count; i++) {
map.put(
"a" + i, MessageAttributeValue.builder().stringValue("v" + i).dataType("String").build());
}
return map;
}
protected boolean isXrayInjectionEnabled() {
return true;
}
protected void configureSdkClient(SqsClientBuilder builder) throws URISyntaxException {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(new URI("http://localhost:" + sqsPort));
builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER);
}
protected void configureSdkClient(SqsAsyncClientBuilder builder) throws URISyntaxException {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(new URI("http://localhost:" + sqsPort));
builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER);
}
protected boolean isSqsAttributeInjectionEnabled() {
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
return ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
}
@BeforeAll
static void setUp() {
sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start();
Http.ServerBinding server = sqs.waitUntilStarted();
sqsPort = server.localAddress().getPort();
}
@AfterAll
static void cleanUp() {
if (sqs != null) {
sqs.stopAndWait();
}
}
@Test
void testSimpleSqsProducerConsumerServicesSync() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
configureSdkClient(builder);
SqsClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest);
client.sendMessage(sendMessageRequest);
ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest);
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(false, false);
}
@Test
void testSimpleSqsProducerConsumerServicesWithParentSync() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
configureSdkClient(builder);
SqsClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest);
client.sendMessage(sendMessageRequest);
ReceiveMessageResponse response =
getTesting().runWithSpan("parent", () -> client.receiveMessage(receiveMessageRequest));
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(true, false);
}
@SuppressWarnings("InterruptedExceptionSwallowed")
@Test
void testSimpleSqsProducerConsumerServicesAsync() throws Exception {
SqsAsyncClientBuilder builder = SqsAsyncClient.builder();
configureSdkClient(builder);
SqsAsyncClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest).get();
client.sendMessage(sendMessageRequest).get();
ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest).get();
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(false, false);
}
static SpanDataAssert createQueueSpan(SpanDataAssert span) {
return span.hasName("Sqs.CreateQueue")
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.queue.name"), "testSdkSqs"),
satisfies(
AWS_REQUEST_ID,
val -> val.matches("\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "CreateQueue"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort));
}
@SuppressWarnings("deprecation") // using deprecated semconv
static SpanDataAssert processSpan(SpanDataAssert span, SpanData parent) {
return span.hasName("testSdkSqs process")
.hasKind(SpanKind.CONSUMER)
.hasParent(parent)
.hasTotalRecordedLinks(0)
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "ReceiveMessage"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class)));
}
@SuppressWarnings("deprecation") // using deprecated semconv
static SpanDataAssert publishSpan(SpanDataAssert span, String queueUrl, String rcpMethod) {
return span.hasName("testSdkSqs publish")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.queue.url"), queueUrl),
satisfies(
AWS_REQUEST_ID,
val -> val.matches("\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, rcpMethod),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
MESSAGING_MESSAGE_ID,
val ->
val.satisfiesAnyOf(
v -> assertThat(v).isInstanceOf(String.class),
v -> assertThat(v).isNull())));
}
}

View File

@ -0,0 +1,161 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v2_2;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_REQUEST_METHOD;
import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.UrlAttributes.URL_FULL;
import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.testing.assertj.TraceAssert;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.function.Consumer;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
public abstract class AbstractAws2SqsSuppressReceiveSpansTest extends AbstractAws2SqsBaseTest {
@Override
protected void assertSqsTraces(boolean withParent, boolean captureHeaders) {
List<Consumer<TraceAssert>> traceAsserts =
new ArrayList<>(
Arrays.asList(
trace -> trace.hasSpansSatisfyingExactly(span -> createQueueSpan(span)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> publishSpan(span, queueUrl, "SendMessage"),
span -> processSpan(span, trace.getSpan(0)),
span ->
span.hasName("process child")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty()))));
if (withParent) {
/*
* This span represents HTTP "sending of receive message" operation. It's always single,
* while there can be multiple CONSUMER spans (one per consumed message).
* This one could be suppressed (by IF in TracingRequestHandler#beforeRequest but then
* HTTP instrumentation span would appear)
*/
traceAsserts.add(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent(),
span ->
span.hasName("Sqs.ReceiveMessage")
.hasKind(SpanKind.CLIENT)
.hasTotalRecordedLinks(0)
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.queue.url"), queueUrl),
satisfies(
AWS_REQUEST_ID,
val ->
val.matches(
"\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "ReceiveMessage"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort))));
}
getTesting().waitAndAssertTraces(traceAsserts);
}
@Test
@SuppressWarnings("deprecation") // using deprecated semconv
void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
configureSdkClient(builder);
SqsClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest);
client.sendMessageBatch(sendMessageBatchRequest);
ReceiveMessageResponse response = client.receiveMessage(receiveMessageBatchRequest);
int totalAttrs =
response.messages().stream().mapToInt(message -> message.messageAttributes().size()).sum();
// generates the process spans
response.messages().forEach(message -> {});
assertThat(response.messages().size()).isEqualTo(3);
// +2: 3 messages, 2x traceparent, 1x not injected due to too many attrs
assertThat(totalAttrs).isEqualTo(18 + (isSqsAttributeInjectionEnabled() ? 2 : 0));
List<Consumer<TraceAssert>> traceAsserts =
new ArrayList<>(
Arrays.asList(
trace -> trace.hasSpansSatisfyingExactly(span -> createQueueSpan(span)),
trace -> {
List<Consumer<SpanDataAssert>> spanAsserts =
new ArrayList<>(
singletonList(span -> publishSpan(span, queueUrl, "SendMessageBatch")));
for (int i = 0; i <= (isXrayInjectionEnabled() ? 2 : 1); i++) {
spanAsserts.add(span -> processSpan(span, trace.getSpan(0)));
}
trace.hasSpansSatisfyingExactly(spanAsserts);
}));
if (!isXrayInjectionEnabled()) {
traceAsserts.add(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testSdkSqs process")
.hasKind(SpanKind.CONSUMER)
// TODO: This is not good, and can also happen if producer is not
// instrumented
.hasNoParent()
.hasTotalRecordedLinks(0)
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "ReceiveMessage"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(MESSAGING_MESSAGE_ID, v -> v.isInstanceOf(String.class)))));
}
getTesting().waitAndAssertTraces(traceAsserts);
}
}

View File

@ -14,186 +14,51 @@ import static io.opentelemetry.semconv.HttpAttributes.HTTP_RESPONSE_STATUS_CODE;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
import static io.opentelemetry.semconv.UrlAttributes.URL_FULL;
import static io.opentelemetry.semconv.incubating.AwsIncubatingAttributes.AWS_REQUEST_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MessagingSystemIncubatingValues.AWS_SQS;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_METHOD;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SERVICE;
import static io.opentelemetry.semconv.incubating.RpcIncubatingAttributes.RPC_SYSTEM;
import static java.util.Collections.singletonList;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.api.internal.ConfigPropertiesUtil;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.incubating.AwsIncubatingAttributes;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import io.opentelemetry.testing.internal.armeria.internal.shaded.guava.collect.ImmutableList;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.apache.pekko.http.scaladsl.Http;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.core.client.config.ClientOverrideConfiguration;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsAsyncClient;
import software.amazon.awssdk.services.sqs.SqsAsyncClientBuilder;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.SqsClientBuilder;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.MessageAttributeValue;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageRequest;
import software.amazon.awssdk.services.sqs.model.ReceiveMessageResponse;
import software.amazon.awssdk.services.sqs.model.SendMessageBatchRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
@SuppressWarnings("deprecation") // using deprecated semconv
public abstract class AbstractAws2SqsTracingTest {
public abstract class AbstractAws2SqsTracingTest extends AbstractAws2SqsBaseTest {
protected abstract InstrumentationExtension getTesting();
protected abstract SqsClient configureSqsClient(SqsClient sqsClient);
protected abstract SqsAsyncClient configureSqsClient(SqsAsyncClient sqsClient);
protected abstract ClientOverrideConfiguration.Builder createOverrideConfigurationBuilder();
private static final StaticCredentialsProvider CREDENTIALS_PROVIDER =
StaticCredentialsProvider.create(
AwsBasicCredentials.create("my-access-key", "my-secret-key"));
private static int sqsPort;
private static SQSRestServer sqs;
static Map<String, MessageAttributeValue> dummyMessageAttributes(int count) {
Map<String, MessageAttributeValue> map = new HashMap<>();
for (int i = 0; i < count; i++) {
map.put(
"a" + i, MessageAttributeValue.builder().stringValue("v" + i).dataType("String").build());
}
return map;
}
private final String queueUrl = "http://localhost:" + sqsPort + "/000000000000/testSdkSqs";
ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder().queueUrl(queueUrl).build();
ReceiveMessageRequest receiveMessageBatchRequest =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.maxNumberOfMessages(3)
.messageAttributeNames("All")
.waitTimeSeconds(5)
.build();
CreateQueueRequest createQueueRequest =
CreateQueueRequest.builder().queueName("testSdkSqs").build();
SendMessageRequest sendMessageRequest =
SendMessageRequest.builder().queueUrl(queueUrl).messageBody("{\"type\": \"hello\"}").build();
@SuppressWarnings("unchecked")
SendMessageBatchRequest sendMessageBatchRequest =
SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(
e -> e.messageBody("e1").id("i1"),
// 8 attributes, injection always possible
e -> e.messageBody("e2").id("i2").messageAttributes(dummyMessageAttributes(8)),
// 10 attributes, injection with custom propagator never possible
e -> e.messageBody("e3").id("i3").messageAttributes(dummyMessageAttributes(10)))
.build();
boolean isSqsAttributeInjectionEnabled() {
// See io.opentelemetry.instrumentation.awssdk.v2_2.autoconfigure.TracingExecutionInterceptor
return ConfigPropertiesUtil.getBoolean(
"otel.instrumentation.aws-sdk.experimental-use-propagator-for-messaging", false);
}
boolean isXrayInjectionEnabled() {
return true;
}
void configureSdkClient(SqsClientBuilder builder) throws URISyntaxException {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(new URI("http://localhost:" + sqsPort));
builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER);
}
void configureSdkClient(SqsAsyncClientBuilder builder) throws URISyntaxException {
builder
.overrideConfiguration(createOverrideConfigurationBuilder().build())
.endpointOverride(new URI("http://localhost:" + sqsPort));
builder.region(Region.AP_NORTHEAST_1).credentialsProvider(CREDENTIALS_PROVIDER);
}
@BeforeAll
static void setUp() {
sqs = SQSRestServerBuilder.withPort(0).withInterface("localhost").start();
Http.ServerBinding server = sqs.waitUntilStarted();
sqsPort = server.localAddress().getPort();
}
@AfterAll
static void cleanUp() {
if (sqs != null) {
sqs.stopAndWait();
}
}
void assertSqsTraces(Boolean withParent, Boolean captureHeaders) {
@Override
protected void assertSqsTraces(boolean withParent, boolean captureHeaders) {
int offset = withParent ? 2 : 0;
AtomicReference<SpanData> publishSpan = new AtomicReference<>();
getTesting()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("Sqs.CreateQueue")
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.queue.name"), "testSdkSqs"),
satisfies(
AwsIncubatingAttributes.AWS_REQUEST_ID,
val ->
val.satisfiesAnyOf(
v ->
assertThat(v)
.isEqualTo(
"00000000-0000-0000-0000-000000000000"),
v -> assertThat(v).isEqualTo("UNKNOWN"))),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "CreateQueue"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(
URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort))),
trace -> trace.hasSpansSatisfyingExactly(span -> createQueueSpan(span)),
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
@ -206,14 +71,10 @@ public abstract class AbstractAws2SqsTracingTest {
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
satisfies(
AwsIncubatingAttributes.AWS_REQUEST_ID,
AWS_REQUEST_ID,
val ->
val.satisfiesAnyOf(
v ->
assertThat(v)
.isEqualTo(
"00000000-0000-0000-0000-000000000000"),
v -> assertThat(v).isEqualTo("UNKNOWN"))),
val.matches(
"\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "SendMessage"),
@ -223,10 +84,7 @@ public abstract class AbstractAws2SqsTracingTest {
URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(
MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.AWS_SQS),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "publish"),
satisfies(
@ -266,14 +124,10 @@ public abstract class AbstractAws2SqsTracingTest {
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
satisfies(
AwsIncubatingAttributes.AWS_REQUEST_ID,
AWS_REQUEST_ID,
val ->
val.satisfiesAnyOf(
v ->
assertThat(v)
.isEqualTo(
"00000000-0000-0000-0000-000000000000"),
v -> assertThat(v).isEqualTo("UNKNOWN"))),
val.matches(
"\\s*00000000-0000-0000-0000-000000000000\\s*|UNKNOWN")),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "ReceiveMessage"),
@ -301,10 +155,7 @@ public abstract class AbstractAws2SqsTracingTest {
URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(
MESSAGING_SYSTEM,
MessagingIncubatingAttributes
.MessagingSystemIncubatingValues.AWS_SQS),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "receive"),
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 1)));
@ -341,10 +192,7 @@ public abstract class AbstractAws2SqsTracingTest {
URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(
MESSAGING_SYSTEM,
MessagingIncubatingAttributes
.MessagingSystemIncubatingValues.AWS_SQS),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(
@ -354,7 +202,7 @@ public abstract class AbstractAws2SqsTracingTest {
attributes.add(
satisfies(
stringArrayKey("messaging.header.test_message_header"),
v -> v.isEqualTo(ImmutableList.of("test"))));
v -> v.isEqualTo(singletonList("test"))));
}
span.hasName("testSdkSqs process")
@ -378,24 +226,6 @@ public abstract class AbstractAws2SqsTracingTest {
});
}
@Test
void testSimpleSqsProducerConsumerServicesSync() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
configureSdkClient(builder);
SqsClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest);
client.sendMessage(sendMessageRequest);
ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest);
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(false, false);
}
@Test
void testCaptureMessageHeaderAsAttributeSpan() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
@ -423,42 +253,6 @@ public abstract class AbstractAws2SqsTracingTest {
assertSqsTraces(false, true);
}
@Test
void testSimpleSqsProducerConsumerServicesWithParentSync() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
configureSdkClient(builder);
SqsClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest);
client.sendMessage(sendMessageRequest);
ReceiveMessageResponse response =
getTesting().runWithSpan("parent", () -> client.receiveMessage(receiveMessageRequest));
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(true, false);
}
@SuppressWarnings("InterruptedExceptionSwallowed")
@Test
void testSimpleSqsProducerConsumerServicesAsync() throws Exception {
SqsAsyncClientBuilder builder = SqsAsyncClient.builder();
configureSdkClient(builder);
SqsAsyncClient client = configureSqsClient(builder.build());
client.createQueue(createQueueRequest).get();
client.sendMessage(sendMessageRequest).get();
ReceiveMessageResponse response = client.receiveMessage(receiveMessageRequest).get();
assertThat(response.messages().size()).isEqualTo(1);
response.messages().forEach(message -> getTesting().runWithSpan("process child", () -> {}));
assertSqsTraces(false, false);
}
@Test
void testBatchSqsProducerConsumerServicesSync() throws URISyntaxException {
SqsClientBuilder builder = SqsClient.builder();
@ -489,38 +283,7 @@ public abstract class AbstractAws2SqsTracingTest {
trace -> {
publishSpan.set(trace.getSpan(0));
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testSdkSqs publish")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
satisfies(
AwsIncubatingAttributes.AWS_REQUEST_ID,
val ->
val.satisfiesAnyOf(
v ->
assertThat(v.trim())
.isEqualTo(
"00000000-0000-0000-0000-000000000000"),
v -> assertThat(v.trim()).isEqualTo("UNKNOWN"))),
equalTo(RPC_SYSTEM, "aws-api"),
equalTo(RPC_SERVICE, "Sqs"),
equalTo(RPC_METHOD, "SendMessageBatch"),
equalTo(HTTP_REQUEST_METHOD, "POST"),
equalTo(HTTP_RESPONSE_STATUS_CODE, 200),
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(
MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "publish")));
span -> publishSpan(span, queueUrl, "SendMessageBatch"));
},
trace -> {
List<Consumer<SpanDataAssert>> spanAsserts = new ArrayList<>();
@ -540,10 +303,7 @@ public abstract class AbstractAws2SqsTracingTest {
satisfies(URL_FULL, v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(
MESSAGING_SYSTEM,
MessagingIncubatingAttributes.MessagingSystemIncubatingValues
.AWS_SQS),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "receive"),
equalTo(MESSAGING_BATCH_MESSAGE_COUNT, 3)));
@ -585,10 +345,7 @@ public abstract class AbstractAws2SqsTracingTest {
v -> v.startsWith("http://localhost:" + sqsPort)),
equalTo(SERVER_ADDRESS, "localhost"),
equalTo(SERVER_PORT, sqsPort),
equalTo(
MESSAGING_SYSTEM,
MessagingIncubatingAttributes
.MessagingSystemIncubatingValues.AWS_SQS),
equalTo(MESSAGING_SYSTEM, AWS_SQS),
equalTo(MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(MESSAGING_OPERATION, "process"),
satisfies(