Fix flaky aws sqs test (#10800)

This commit is contained in:
Lauri Tulmin 2024-03-11 15:08:41 +02:00 committed by GitHub
parent a942b2395c
commit bc53f2ead1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
1 changed files with 120 additions and 82 deletions

View File

@ -28,11 +28,14 @@ import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.test.utils.PortUtils; import io.opentelemetry.instrumentation.test.utils.PortUtils;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.testing.assertj.SpanDataAssert;
import io.opentelemetry.sdk.trace.data.SpanData; import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.SemanticAttributes; import io.opentelemetry.semconv.SemanticAttributes;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.elasticmq.rest.sqs.SQSRestServer; import org.elasticmq.rest.sqs.SQSRestServer;
import org.elasticmq.rest.sqs.SQSRestServerBuilder; import org.elasticmq.rest.sqs.SQSRestServerBuilder;
import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.AfterEach;
@ -324,88 +327,123 @@ public abstract class AbstractSqsTracingTest {
SemanticAttributes.MESSAGING_MESSAGE_ID, SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)), val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1"))), equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1"))),
trace -> trace -> {
trace.hasSpansSatisfyingExactlyInAnyOrder( AtomicReference<SpanData> receiveSpan = new AtomicReference<>();
span -> span.hasName("parent").hasNoParent().hasAttributes(Attributes.empty()), AtomicReference<SpanData> processSpan = new AtomicReference<>();
span ->
span.hasName("SQS.ReceiveMessage") List<Consumer<SpanDataAssert>> assertions =
.hasKind(SpanKind.CLIENT) new ArrayList<>(
.hasParent(trace.getSpan(0)) Arrays.asList(
.hasAttributesSatisfyingExactly( span ->
equalTo(stringKey("aws.agent"), "java-aws-sdk"), span.hasName("parent")
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort), .hasNoParent()
equalTo( .hasAttributes(Attributes.empty()),
stringKey("aws.queue.url"), span ->
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"), span.hasName("SQS.ReceiveMessage")
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"), .hasKind(SpanKind.CLIENT)
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"), .hasParent(trace.getSpan(0))
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"), .hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"), equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200), equalTo(
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort), stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"), equalTo(
equalTo(SemanticAttributes.SERVER_PORT, sqsPort), stringKey("aws.queue.url"),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")), "http://localhost:"
span -> + sqsPort
span.hasName("testSdkSqs receive") + "/000000000000/testSdkSqs"),
.hasKind(SpanKind.CONSUMER) equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
.hasParent(trace.getSpan(0)) equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
.hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
equalTo(stringKey("aws.agent"), "java-aws-sdk"), equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort), equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo( equalTo(
stringKey("aws.queue.url"), SemanticAttributes.URL_FULL,
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"), "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"), equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"), equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"), equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"), span ->
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200), span.hasName("testSdkSqs receive")
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort), .hasKind(SpanKind.CONSUMER)
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"), .hasParent(trace.getSpan(0))
equalTo(SemanticAttributes.SERVER_PORT, sqsPort), .hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"), equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo( equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"), stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), equalTo(
equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1), stringKey("aws.queue.url"),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")), "http://localhost:"
span -> { + sqsPort
// on jdk8 the order of the "SQS.ReceiveMessage" and "testSdkSqs receive" + "/000000000000/testSdkSqs"),
// spans can vary equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
SpanData parent = equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
"testSdkSqs receive".equals(trace.getSpan(2).getName()) equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
? trace.getSpan(2) equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
: trace.getSpan(1); equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
span.hasName("testSdkSqs process") equalTo(
.hasKind(SpanKind.CONSUMER) SemanticAttributes.URL_FULL,
.hasParent(parent) "http://localhost:" + sqsPort),
.hasAttributesSatisfyingExactly( equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(stringKey("aws.agent"), "java-aws-sdk"), equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(stringKey("aws.endpoint"), "http://localhost:" + sqsPort), equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo( equalTo(
stringKey("aws.queue.url"), SemanticAttributes.MESSAGING_DESTINATION_NAME,
"http://localhost:" + sqsPort + "/000000000000/testSdkSqs"), "testSdkSqs"),
equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"), equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"), equalTo(SemanticAttributes.MESSAGING_BATCH_MESSAGE_COUNT, 1),
equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"), equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")),
equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"), span ->
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200), span.hasName("testSdkSqs process")
equalTo(SemanticAttributes.URL_FULL, "http://localhost:" + sqsPort), .hasKind(SpanKind.CONSUMER)
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"), .hasParent(receiveSpan.get())
equalTo(SemanticAttributes.SERVER_PORT, sqsPort), .hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"), equalTo(stringKey("aws.agent"), "java-aws-sdk"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSdkSqs"), equalTo(
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), stringKey("aws.endpoint"), "http://localhost:" + sqsPort),
satisfies( equalTo(
SemanticAttributes.MESSAGING_MESSAGE_ID, stringKey("aws.queue.url"),
val -> val.isInstanceOf(String.class)), "http://localhost:"
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")); + sqsPort
}, + "/000000000000/testSdkSqs"),
span -> equalTo(SemanticAttributes.RPC_SYSTEM, "aws-api"),
span.hasName("process child") equalTo(SemanticAttributes.RPC_SERVICE, "AmazonSQS"),
.hasParent(trace.getSpan(3)) equalTo(SemanticAttributes.RPC_METHOD, "ReceiveMessage"),
.hasAttributes(Attributes.empty()))); equalTo(SemanticAttributes.HTTP_REQUEST_METHOD, "POST"),
equalTo(SemanticAttributes.HTTP_RESPONSE_STATUS_CODE, 200),
equalTo(
SemanticAttributes.URL_FULL,
"http://localhost:" + sqsPort),
equalTo(SemanticAttributes.SERVER_ADDRESS, "localhost"),
equalTo(SemanticAttributes.SERVER_PORT, sqsPort),
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
"testSdkSqs"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
val -> val.isInstanceOf(String.class)),
equalTo(SemanticAttributes.NETWORK_PROTOCOL_VERSION, "1.1")),
span ->
span.hasName("process child")
.hasParent(processSpan.get())
.hasAttributes(Attributes.empty())));
// on jdk8 the order of the "SQS.ReceiveMessage" and "testSdkSqs receive"
// spans can vary
if ("SQS.ReceiveMessage".equals(trace.getSpan(1).getName())) {
receiveSpan.set(trace.getSpan(2));
processSpan.set(trace.getSpan(3));
} else {
receiveSpan.set(trace.getSpan(1));
processSpan.set(trace.getSpan(2));
// move "SQS.ReceiveMessage" assertions to the last position
assertions.add(assertions.remove(1));
}
trace.hasSpansSatisfyingExactly(assertions);
});
} }
@Test @Test