Convert aws 1.11 SQS tests from groovy to java (#10657)

This commit is contained in:
Jay DeLuca 2024-02-27 04:30:53 -05:00 committed by GitHub
parent fa1ccd0683
commit 6a0b363a38
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 509 additions and 434 deletions

View File

@ -1,17 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsTracingTest
import io.opentelemetry.instrumentation.test.AgentTestTrait
class SqsTracingTest extends AbstractSqsTracingTest implements AgentTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsTracingTest;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SqsTracingTest extends AbstractSqsTracingTest {
@RegisterExtension
private static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
@Override
public AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client;
}
}

View File

@ -1,17 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11.instrumentor
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsTracingTest
import io.opentelemetry.instrumentation.test.LibraryTestTrait
class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.awssdk.v1_11;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import io.opentelemetry.instrumentation.awssdk.v1_11.AbstractSqsTracingTest;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SqsTracingTest extends AbstractSqsTracingTest {
@RegisterExtension
private static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
@Override
public AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client;
}
}

View File

@ -1,24 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import static java.util.Collections.singletonList
class SqsTracingTest extends AbstractSqsTracingTest implements LibraryTestTrait {
@Override
AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client.withRequestHandlers(
AwsSdkTelemetry.builder(getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setMessagingReceiveInstrumentationEnabled(true)
.setCapturedHeaders(singletonList("test-message-header"))
.build()
.newRequestHandler())
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import static java.util.Collections.singletonList;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import org.junit.jupiter.api.extension.RegisterExtension;
class SqsTracingTest extends AbstractSqsTracingTest {
@RegisterExtension
private static final InstrumentationExtension testing = LibraryInstrumentationExtension.create();
@Override
protected InstrumentationExtension testing() {
return testing;
}
@Override
public AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client) {
return client.withRequestHandlers(
AwsSdkTelemetry.builder(testing().getOpenTelemetry())
.setCaptureExperimentalSpanAttributes(true)
.setMessagingReceiveInstrumentationEnabled(true)
.setCapturedHeaders(singletonList("test-message-header"))
.build()
.newRequestHandler());
}
}

View File

@ -1,376 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11
import com.amazonaws.auth.AWSStaticCredentialsProvider
import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.client.builder.AwsClientBuilder
import com.amazonaws.services.sqs.AmazonSQSAsyncClient
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.MessageAttributeValue
import com.amazonaws.services.sqs.model.ReceiveMessageRequest
import com.amazonaws.services.sqs.model.SendMessageRequest
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.utils.PortUtils
import io.opentelemetry.sdk.trace.data.SpanData
import io.opentelemetry.semconv.SemanticAttributes
import org.elasticmq.rest.sqs.SQSRestServerBuilder
import spock.lang.Shared
import static io.opentelemetry.api.trace.SpanKind.CLIENT
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
abstract AmazonSQSAsyncClientBuilder configureClient(AmazonSQSAsyncClientBuilder client)
@Shared
def sqs
@Shared
AmazonSQSAsyncClient client
@Shared
int sqsPort
def setupSpec() {
sqsPort = PortUtils.findOpenPort()
sqs = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start()
println getClass().name + " SQS server started at: localhost:$sqsPort/"
def credentials = new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"))
def endpointConfiguration = new AwsClientBuilder.EndpointConfiguration("http://localhost:" + sqsPort, "elasticmq")
client = configureClient(AmazonSQSAsyncClient.asyncBuilder()).withCredentials(credentials).withEndpointConfiguration(endpointConfiguration).build()
}
def cleanupSpec() {
if (sqs != null) {
sqs.stopAndWait()
}
}
def "simple sqs producer-consumer services #testCaptureHeaders"() {
setup:
client.createQueue("testSdkSqs")
when:
SendMessageRequest sendMessageRequest = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
if (testCaptureHeaders) {
sendMessageRequest.addMessageAttributesEntry("test-message-header", new MessageAttributeValue().withDataType("String").withStringValue("test"))
}
client.sendMessage(sendMessageRequest)
ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs")
if (testCaptureHeaders) {
receiveMessageRequest.withMessageAttributeNames("test-message-header")
}
def receiveMessageResult = client.receiveMessage(receiveMessageRequest)
// test different ways of iterating the messages list
if (testCaptureHeaders) {
receiveMessageResult.messages.each { message ->
runWithSpan("process child") {}
}
} else {
receiveMessageResult.messages.forEach { message ->
runWithSpan("process child") {}
}
}
then:
assertTraces(3) {
SpanData publishSpan
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.name" "testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"rpc.method" "CreateQueue"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
}
trace(1, 1) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.method" "SendMessage"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
publishSpan = span(0)
}
trace(2, 3) {
span(0) {
name "testSdkSqs receive"
kind CONSUMER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(1) {
name "testSdkSqs process"
kind CONSUMER
childOf span(0)
hasLink(publishSpan)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
if (testCaptureHeaders) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
span(2) {
name "process child"
childOf span(1)
attributes {
}
}
}
}
where:
testCaptureHeaders << [false, true]
}
def "simple sqs producer-consumer services with parent span"() {
setup:
client.createQueue("testSdkSqs")
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs", "{\"type\": \"hello\"}")
client.sendMessage(send)
runWithSpan("parent") {
def receiveMessageResult = client.receiveMessage("http://localhost:$sqsPort/000000000000/testSdkSqs")
receiveMessageResult.messages.each {message -> runWithSpan("process child") {}}
}
then:
assertTraces(3) {
SpanData publishSpan
trace(0, 1) {
span(0) {
name "SQS.CreateQueue"
kind CLIENT
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.name" "testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"rpc.method" "CreateQueue"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
}
trace(1, 1) {
span(0) {
name "testSdkSqs publish"
kind PRODUCER
hasNoParent()
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.method" "SendMessage"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
publishSpan = span(0)
}
trace(2, 5) {
// sort spans with a ranking function
spans.sort({
// job span is first
if (it.name == "parent") {
return 0
}
if (it.name == "SQS.ReceiveMessage") {
return 1
}
if (it.name == "testSdkSqs receive") {
return 2
}
return 3
})
span(0) {
name "parent"
hasNoParent()
}
span(1) {
name "SQS.ReceiveMessage"
kind CLIENT
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(2) {
name "testSdkSqs receive"
kind CONSUMER
childOf span(0)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT" 1
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(3) {
name "testSdkSqs process"
kind CONSUMER
childOf span(2)
hasLink(publishSpan)
attributes {
"aws.agent" "java-aws-sdk"
"aws.endpoint" "http://localhost:$sqsPort"
"rpc.method" "ReceiveMessage"
"aws.queue.url" "http://localhost:$sqsPort/000000000000/testSdkSqs"
"rpc.system" "aws-api"
"rpc.service" "AmazonSQS"
"$SemanticAttributes.HTTP_REQUEST_METHOD" "POST"
"$SemanticAttributes.HTTP_RESPONSE_STATUS_CODE" 200
"$SemanticAttributes.URL_FULL" "http://localhost:$sqsPort"
"$SemanticAttributes.SERVER_ADDRESS" "localhost"
"$SemanticAttributes.SERVER_PORT" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.NETWORK_PROTOCOL_VERSION" "1.1"
}
}
span(4) {
name "process child"
childOf span(3)
attributes {
}
}
}
}
}
def "only adds attribute name once when request reused"() {
setup:
client.createQueue("testSdkSqs2")
when:
SendMessageRequest send = new SendMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2", "{\"type\": \"hello\"}")
client.sendMessage(send)
ReceiveMessageRequest receive = new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2")
client.receiveMessage(receive)
client.sendMessage(send)
client.receiveMessage(receive)
then:
receive.getAttributeNames() == ["AWSTraceHeader"]
}
}

View File

@ -0,0 +1,418 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.awssdk.v1_11;
import static io.opentelemetry.api.common.AttributeKey.stringArrayKey;
import static io.opentelemetry.api.common.AttributeKey.stringKey;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.client.builder.AwsClientBuilder;
import com.amazonaws.services.sqs.AmazonSQSAsync;
import com.amazonaws.services.sqs.AmazonSQSAsyncClient;
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder;
import com.amazonaws.services.sqs.model.Message;
import com.amazonaws.services.sqs.model.MessageAttributeValue;
import com.amazonaws.services.sqs.model.ReceiveMessageRequest;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import com.amazonaws.services.sqs.model.SendMessageRequest;
import com.google.common.collect.ImmutableList;
import io.opentelemetry.api.common.Attributes;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.semconv.SemanticAttributes;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
public abstract class AbstractSqsTracingTest {
protected abstract InstrumentationExtension testing();
protected abstract AmazonSQSAsyncClientBuilder configureClient(
AmazonSQSAsyncClientBuilder client);
private static int sqsPort;
private static SQSRestServer sqsRestServer;
private static AmazonSQSAsync sqsClient;
@BeforeEach
void setUp() {
sqsPort = PortUtils.findOpenPort();
sqsRestServer = SQSRestServerBuilder.withPort(sqsPort).withInterface("localhost").start();
AWSStaticCredentialsProvider credentials =
new AWSStaticCredentialsProvider(new BasicAWSCredentials("x", "x"));
AwsClientBuilder.EndpointConfiguration endpointConfiguration =
new AwsClientBuilder.EndpointConfiguration("http://localhost:" + sqsPort, "elasticmq");
sqsClient =
configureClient(AmazonSQSAsyncClient.asyncBuilder())
.withCredentials(credentials)
.withEndpointConfiguration(endpointConfiguration)
.build();
}
@AfterEach
void cleanUp() {
if (sqsRestServer != null) {
sqsRestServer.stopAndWait();
}
}
@ParameterizedTest
@ValueSource(booleans = {true, false})
void testSimpleSqsProducerConsumerServicesCaptureHeaders(boolean testCaptureHeaders) {
sqsClient.createQueue("testSdkSqs");
SendMessageRequest sendMessageRequest =
new SendMessageRequest(
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs", "{\"type\": \"hello\"}");
if (testCaptureHeaders) {
sendMessageRequest.addMessageAttributesEntry(
"test-message-header",
new MessageAttributeValue().withDataType("String").withStringValue("test"));
}
sqsClient.sendMessage(sendMessageRequest);
ReceiveMessageRequest receiveMessageRequest =
new ReceiveMessageRequest("http://localhost:" + sqsPort + "/000000000000/testSdkSqs");
if (testCaptureHeaders) {
receiveMessageRequest.withMessageAttributeNames("test-message-header");
}
ReceiveMessageResult receiveMessageResult = sqsClient.receiveMessage(receiveMessageRequest);
// test different ways of iterating the messages list
if (testCaptureHeaders) {
for (Message unused : receiveMessageResult.getMessages()) {
testing().runWithSpan("process child", () -> {});
}
} else {
receiveMessageResult
.getMessages()
.forEach(message -> testing().runWithSpan("process child", () -> {}));
}
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("SQS.CreateQueue")
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(stringKey("aws.queue.name"), "testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "CreateQueue"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1"))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
List<AttributeAssertion> attributes =
new ArrayList<>(
Arrays.asList(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "SendMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(
SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")));
if (testCaptureHeaders) {
attributes.add(
satisfies(
stringArrayKey("messaging.header.test_message_header"),
val -> val.isEqualTo(ImmutableList.of("test"))));
}
span.hasName("testSdkSqs publish")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfyingExactly(attributes);
}),
trace ->
trace.hasSpansSatisfyingExactly(
span -> {
List<AttributeAssertion> attributes =
new ArrayList<>(
Arrays.asList(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(
SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")));
if (testCaptureHeaders) {
attributes.add(
satisfies(
stringArrayKey("messaging.header.test_message_header"),
val -> val.isEqualTo(ImmutableList.of("test"))));
}
span.hasName("testSdkSqs receive")
.hasKind(SpanKind.CONSUMER)
.hasNoParent()
.hasAttributesSatisfyingExactly(attributes);
},
span -> {
List<AttributeAssertion> attributes =
new ArrayList<>(
Arrays.asList(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(
SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")));
if (testCaptureHeaders) {
attributes.add(
satisfies(
stringArrayKey("messaging.header.test_message_header"),
val -> val.isEqualTo(ImmutableList.of("test"))));
}
span.hasName("testSdkSqs process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(attributes);
},
span ->
span.hasName("process child")
.hasParent(trace.getSpan(1))
.hasAttributes(Attributes.empty())));
}
@Test
void testSimpleSqsProducerConsumerServicesWithParentSpan() {
sqsClient.createQueue("testSdkSqs");
SendMessageRequest sendMessageRequest =
new SendMessageRequest(
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs", "{\"type\": \"hello\"}");
sqsClient.sendMessage(sendMessageRequest);
testing()
.runWithSpan(
"parent",
() -> {
ReceiveMessageResult receiveMessageResult =
sqsClient.receiveMessage(
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs");
receiveMessageResult
.getMessages()
.forEach(message -> testing().runWithSpan("process child", () -> {}));
});
testing()
.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("SQS.CreateQueue")
.hasKind(SpanKind.CLIENT)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(stringKey("aws.queue.name"), "testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "CreateQueue"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1"))),
trace ->
trace.hasSpansSatisfyingExactly(
span ->
span.hasName("testSdkSqs publish")
.hasKind(SpanKind.PRODUCER)
.hasNoParent()
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "SendMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1"))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()),
span ->
span.hasName("SQS.ReceiveMessage")
.hasKind(SpanKind.CLIENT)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")),
span ->
span.hasName("testSdkSqs receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")),
span ->
span.hasName("testSdkSqs process")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(
equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(
stringKey("aws.queue.url"),
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")),
span ->
span.hasName("process child")
.hasParent(trace.getSpan(3))
.hasAttributes(Attributes.empty())));
}
@Test
void testOnlyAddsAttributeNameOnceWhenRequestReused() {
sqsClient.createQueue("testSdkSqs2");
SendMessageRequest send =
new SendMessageRequest(
"http://localhost:$sqsPort/000000000000/testSdkSqs2", "{\"type\": \"hello\"}");
sqsClient.sendMessage(send);
ReceiveMessageRequest receive =
new ReceiveMessageRequest("http://localhost:$sqsPort/000000000000/testSdkSqs2");
sqsClient.receiveMessage(receive);
sqsClient.sendMessage(send);
sqsClient.receiveMessage(receive);
assertThat(receive.getAttributeNames()).isEqualTo(ImmutableList.of("AWSTraceHeader"));
}
}