diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy deleted file mode 100644 index 8b220334dd..0000000000 --- a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy +++ /dev/null @@ -1,671 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8 - -import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.asserts.TraceAssert -import io.opentelemetry.semconv.SemanticAttributes -import org.apache.pulsar.client.admin.PulsarAdmin -import org.apache.pulsar.client.api.Consumer -import org.apache.pulsar.client.api.Message -import org.apache.pulsar.client.api.MessageListener -import org.apache.pulsar.client.api.Messages -import org.apache.pulsar.client.api.Producer -import org.apache.pulsar.client.api.PulsarClient -import org.apache.pulsar.client.api.Schema -import org.apache.pulsar.client.api.SubscriptionInitialPosition -import org.slf4j.Logger -import org.slf4j.LoggerFactory -import org.testcontainers.containers.PulsarContainer -import org.testcontainers.containers.output.Slf4jLogConsumer -import org.testcontainers.utility.DockerImageName -import spock.lang.Shared - -import java.time.Duration -import java.util.concurrent.CompletableFuture -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit -import java.util.regex.Pattern - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.INTERNAL -import static io.opentelemetry.api.trace.SpanKind.PRODUCER - -class PulsarClientTest extends AgentInstrumentationSpecification { - private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest) - - private static final DockerImageName DEFAULT_IMAGE_NAME = - DockerImageName.parse("apachepulsar/pulsar:2.8.0") - - @Shared - private PulsarContainer pulsar - @Shared - private PulsarClient client - @Shared - private PulsarAdmin admin - @Shared - private Producer producer - @Shared - private Consumer consumer - @Shared - private Producer producer2 - - @Shared - private String brokerHost - @Shared - private int brokerPort - - @Override - def setupSpec() { - pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME) - .withEnv("PULSAR_MEM", "-Xmx128m") - .withLogConsumer(new Slf4jLogConsumer(logger)) - .withStartupTimeout(Duration.ofMinutes(2)) - pulsar.start() - - brokerHost = pulsar.host - brokerPort = pulsar.getMappedPort(6650) - client = PulsarClient.builder().serviceUrl(pulsar.pulsarBrokerUrl).build() - admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.httpServiceUrl).build() - } - - @Override - def cleanupSpec() { - producer?.close() - consumer?.close() - producer2?.close() - client?.close() - admin?.close() - pulsar.close() - } - - def "test send non-partitioned topic"() { - setup: - def topic = "persistent://public/default/testSendNonPartitionedTopic" - admin.topics().createNonPartitionedTopic(topic) - producer = - client.newProducer(Schema.STRING).topic(topic) - .enableBatching(false).create() - - when: - String msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - } - } - } - - def "test consume non-partitioned topic"() { - setup: - def topic = "persistent://public/default/testConsumeNonPartitionedTopic" - def latch = new CountDownLatch(1) - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .messageListener(new MessageListener() { - @Override - void received(Consumer consumer, Message msg) { - consumer.acknowledge(msg) - latch.countDown() - } - }) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - latch.await(1, TimeUnit.MINUTES) - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - receiveSpan(it, 2, span(1), topic, msgId) - processSpan(it, 3, span(2), topic, msgId) - } - } - } - - def "test consume non-partitioned topic using receive"() { - setup: - def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceive" - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - def receivedMsg = consumer.receive() - consumer.acknowledge(receivedMsg) - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - receiveSpan(it, 2, span(1), topic, msgId) - } - } - } - - def "test consume non-partitioned topic using receiveAsync"() { - setup: - def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveAsync" - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - CompletableFuture> result = consumer.receiveAsync().whenComplete { receivedMsg, throwable -> - runWithSpan("callback") { - consumer.acknowledge(receivedMsg) - } - } - - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - result.get(1, TimeUnit.MINUTES) - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - receiveSpan(it, 2, span(1), topic, msgId) - span(3) { - name "callback" - kind INTERNAL - childOf span(2) - attributes { - } - } - } - } - } - - def "test consume non-partitioned topic using receive with timeout"() { - setup: - def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveWithTimeout" - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - def receivedMsg = consumer.receive(1, TimeUnit.MINUTES) - consumer.acknowledge(receivedMsg) - - then: - assertTraces(1) { - trace(0, 3) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - receiveSpan(it, 2, span(1), topic, msgId) - } - } - } - - def "test consume non-partitioned topic using batchReceive"() { - setup: - def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive" - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - runWithSpan("receive-parent") { - def receivedMsg = consumer.batchReceive() - consumer.acknowledge(receivedMsg) - } - - then: - def producer - assertTraces(2) { - trace(0, 2) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - producer = span(1) - } - trace(1, 2) { - span(0) { - name "receive-parent" - kind INTERNAL - hasNoParent() - } - receiveSpan(it, 1, span(0), topic, null, producer) - } - } - } - - def "test consume non-partitioned topic using batchReceiveAsync"() { - setup: - def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync" - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - CompletableFuture> result = runWithSpan("receive-parent") { - consumer.batchReceiveAsync().whenComplete { receivedMsg, throwable -> - runWithSpan("callback") { - consumer.acknowledge(receivedMsg) - } - } - } - result.get(1, TimeUnit.MINUTES).size() == 1 - - then: - def producer - assertTraces(2) { - trace(0, 2) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId) - producer = span(1) - } - trace(1, 3) { - span(0) { - name "receive-parent" - kind INTERNAL - hasNoParent() - } - receiveSpan(it, 1, span(0), topic, null, producer) - span(2) { - name "callback" - kind INTERNAL - childOf span(1) - attributes { - } - } - } - } - } - - def "capture message header as span attribute"() { - setup: - def topic = "persistent://public/default/testCaptureMessageHeaderTopic" - def latch = new CountDownLatch(1) - admin.topics().createNonPartitionedTopic(topic) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .topic(topic) - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .messageListener(new MessageListener() { - @Override - void received(Consumer consumer, Message msg) { - consumer.acknowledge(msg) - latch.countDown() - } - }) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.newMessage().value(msg).property("test-message-header", "test").send() - } - - latch.await(1, TimeUnit.MINUTES) - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, msgId, true) - receiveSpan(it, 2, span(1), topic, msgId, null, true) - processSpan(it, 3, span(2), topic, msgId, true) - } - } - } - - def "test send partitioned topic"() { - setup: - def topic = "persistent://public/default/testSendPartitionedTopic" - admin.topics().createPartitionedTopic(topic, 2) - producer = - client.newProducer(Schema.STRING).topic(topic) - .enableBatching(false).create() - - when: - String msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - then: - assertTraces(1) { - trace(0, 2) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, ~/${topic}-partition-.*publish/, { it.startsWith(topic) }, msgId) - } - } - } - - def "test consume partitioned topic"() { - setup: - def topic = "persistent://public/default/testConsumePartitionedTopic" - admin.topics().createPartitionedTopic(topic, 2) - - def latch = new CountDownLatch(1) - consumer = client.newConsumer(Schema.STRING) - .subscriptionName("test_sub") - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .topic(topic) - .messageListener(new MessageListener() { - @Override - void received(Consumer consumer, Message msg) { - consumer.acknowledge(msg) - latch.countDown() - } - }) - .subscribe() - - producer = client.newProducer(Schema.STRING) - .topic(topic) - .enableBatching(false) - .create() - - when: - def msg = "test" - def msgId = runWithSpan("parent") { - producer.send(msg) - } - - latch.await(1, TimeUnit.MINUTES) - - then: - assertTraces(1) { - trace(0, 4) { - span(0) { - name "parent" - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, ~/${topic}-partition-.*publish/, { it.startsWith(topic) }, msgId) - receiveSpan(it, 2, span(1), topic, ~/${topic}-partition-.*receive/, { it.startsWith(topic) }, msgId) - processSpan(it, 3, span(2), topic, ~/${topic}-partition-.*process/, { it.startsWith(topic) }, msgId) - } - } - } - - def "test consume multi-topics"() { - setup: - - def topicNamePrefix = "persistent://public/default/testConsumeMulti_" - def topic1 = topicNamePrefix + "1" - def topic2 = topicNamePrefix + "2" - - def latch = new CountDownLatch(2) - producer = client.newProducer(Schema.STRING) - .topic(topic1) - .enableBatching(false) - .create() - producer2 = client.newProducer(Schema.STRING) - .topic(topic2) - .enableBatching(false) - .create() - - when: - runWithSpan("parent1") { - producer.send("test1") - } - runWithSpan("parent2") { - producer2.send("test2") - } - - consumer = client.newConsumer(Schema.STRING) - .topic(topic2, topic1) - .subscriptionName("test_sub") - .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) - .messageListener(new MessageListener() { - @Override - void received(Consumer consumer, Message msg) { - consumer.acknowledge(msg) - latch.countDown() - } - }) - .subscribe() - - latch.await(1, TimeUnit.MINUTES) - - then: - assertTraces(2) { - traces.sort(orderByRootSpanName("parent1", "parent2")) - for (int i in 1..2) { - def topic = i == 1 ? topic1 : topic2 - trace(i - 1, 4) { - span(0) { - name "parent" + i - kind INTERNAL - hasNoParent() - } - producerSpan(it, 1, span(0), topic, null, { it.startsWith(topicNamePrefix) }, String) - receiveSpan(it, 2, span(1), topic, null, { it.startsWith(topicNamePrefix) }, String) - processSpan(it, 3, span(2), topic, null, { it.startsWith(topicNamePrefix) }, String) - } - } - } - } - - def producerSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, boolean headers = false) { - producerSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, headers) - } - - def producerSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, boolean headers = false) { - trace.span(index) { - if (namePattern != null) { - name namePattern - } else { - name "$topic publish" - } - kind PRODUCER - childOf parentSpan - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.SERVER_ADDRESS" brokerHost - "$SemanticAttributes.SERVER_PORT" brokerPort - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination - "$SemanticAttributes.MESSAGING_OPERATION" "publish" - if (msgId == String) { - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - } else if (msgId != null) { - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - } - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "messaging.pulsar.message.type" "normal" - if (headers) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - } - - def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, Object linkedSpan = null, boolean headers = false) { - receiveSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, linkedSpan, headers) - } - - def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, Object linkedSpan = null, boolean headers = false) { - trace.span(index) { - if (namePattern != null) { - name namePattern - } else { - name "$topic receive" - } - kind CONSUMER - childOf parentSpan - if (linkedSpan == null) { - hasNoLinks() - } else { - hasLink linkedSpan - } - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.SERVER_ADDRESS" brokerHost - "$SemanticAttributes.SERVER_PORT" brokerPort - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination - if (msgId == String) { - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - } else if (msgId != null) { - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - } - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - if (headers) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - } - - def processSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, boolean headers = false) { - processSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, headers) - } - - def processSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, boolean headers = false) { - trace.span(index) { - if (namePattern != null) { - name namePattern - } else { - name "$topic process" - } - kind INTERNAL - childOf parentSpan - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination - if (msgId == String) { - "$SemanticAttributes.MESSAGING_MESSAGE_ID" String - } else if (msgId != null) { - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - } - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - if (headers) { - "messaging.header.test_message_header" { it == ["test"] } - } - } - } - } -} diff --git a/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java new file mode 100644 index 0000000000..35bb193740 --- /dev/null +++ b/instrumentation/pulsar/pulsar-2.8/javaagent/src/test/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.java @@ -0,0 +1,699 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8; + +import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo; +import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies; +import static org.assertj.core.api.Assertions.assertThat; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.trace.SpanKind; +import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension; +import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension; +import io.opentelemetry.sdk.testing.assertj.AttributeAssertion; +import io.opentelemetry.sdk.trace.data.LinkData; +import io.opentelemetry.sdk.trace.data.SpanData; +import io.opentelemetry.semconv.SemanticAttributes; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.MessageId; +import org.apache.pulsar.client.api.MessageListener; +import org.apache.pulsar.client.api.Messages; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.assertj.core.api.AbstractLongAssert; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.PulsarContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.utility.DockerImageName; + +class PulsarClientTest { + + private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest.class); + + private static final DockerImageName DEFAULT_IMAGE_NAME = + DockerImageName.parse("apachepulsar/pulsar:2.8.0"); + + @RegisterExtension + static final InstrumentationExtension testing = AgentInstrumentationExtension.create(); + + private static PulsarContainer pulsar; + private static PulsarClient client; + private static PulsarAdmin admin; + private static Producer producer; + private static Consumer consumer; + private static Producer producer2; + + private static String brokerHost; + private static int brokerPort; + + private static final AttributeKey MESSAGE_TYPE = + AttributeKey.stringKey("messaging.pulsar.message.type"); + + @BeforeAll + static void beforeAll() throws PulsarClientException { + pulsar = + new PulsarContainer(DEFAULT_IMAGE_NAME) + .withEnv("PULSAR_MEM", "-Xmx128m") + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withStartupTimeout(Duration.ofMinutes(2)); + pulsar.start(); + + brokerHost = pulsar.getHost(); + brokerPort = pulsar.getMappedPort(6650); + client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build(); + admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build(); + } + + @AfterAll + static void afterAll() throws PulsarClientException { + if (producer != null) { + producer.close(); + } + if (consumer != null) { + consumer.close(); + } + if (producer2 != null) { + producer2.close(); + } + if (client != null) { + client.close(); + } + if (admin != null) { + admin.close(); + } + pulsar.close(); + } + + @Test + void testSendNonPartitionedTopic() throws Exception { + String topic = "persistent://public/default/testSendNonPartitionedTopic"; + admin.topics().createNonPartitionedTopic(topic); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false)))); + } + + @Test + void testConsumeNonPartitionedTopic() throws Exception { + String topic = "persistent://public/default/testConsumeNonPartitionedTopic"; + CountDownLatch latch = new CountDownLatch(1); + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener( + (MessageListener) + (consumer, msg) -> { + acknowledgeMessage(consumer, msg); + latch.countDown(); + }) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + latch.await(1, TimeUnit.MINUTES); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false)), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic, msgId.toString(), false)), + span -> + span.hasName(topic + " process") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + processAttributes(topic, msgId.toString(), false)))); + } + + @Test + void testConsumeNonPartitionedTopicUsingReceive() throws Exception { + String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceive"; + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + Message receivedMsg = consumer.receive(); + consumer.acknowledge(receivedMsg); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false)), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic, msgId.toString(), false)))); + } + + @Test + void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception { + String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveAsync"; + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + CompletableFuture> result = + consumer + .receiveAsync() + .whenComplete( + (message, throwable) -> { + if (message != null) { + testing.runWithSpan("callback", () -> acknowledgeMessage(consumer, message)); + } + }); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + result.get(1, TimeUnit.MINUTES); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false)), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic, msgId.toString(), false)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)))); + } + + @Test + void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception { + String topic = + "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveWithTimeout"; + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + Message receivedMsg = consumer.receive(1, TimeUnit.MINUTES); + consumer.acknowledge(receivedMsg); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false)), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic, msgId.toString(), false)))); + } + + @Test + void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception { + String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive"; + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + testing.runWithSpan( + "receive-parent", + () -> { + Messages receivedMsg = consumer.batchReceive(); + consumer.acknowledge(receivedMsg); + }); + AtomicReference producerSpan = new AtomicReference<>(); + + testing.waitAndAssertTraces( + trace -> { + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false))); + producerSpan.set(trace.getSpan(1)); + }, + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly(receiveAttributes(topic, null, false)))); + } + + @Test + void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception { + String topic = + "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync"; + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + CompletableFuture> result = + testing.runWithSpan( + "receive-parent", + () -> + consumer + .batchReceiveAsync() + .whenComplete( + (messages, throwable) -> { + if (messages != null) { + testing.runWithSpan( + "callback", () -> acknowledgeMessages(consumer, messages)); + } + })); + + assertThat(result.get(1, TimeUnit.MINUTES).size()).isEqualTo(1); + + AtomicReference producerSpan = new AtomicReference<>(); + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> { + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), false)); + producerSpan.set(trace.getSpan(1)); + }), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(0)) + .hasLinks(LinkData.create(producerSpan.get().getSpanContext())) + .hasAttributesSatisfyingExactly(receiveAttributes(topic, null, false)), + span -> + span.hasName("callback") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(1)))); + } + + @Test + void captureMessageHeaderAsSpanAttribute() throws Exception { + String topic = "persistent://public/default/testCaptureMessageHeaderTopic"; + CountDownLatch latch = new CountDownLatch(1); + admin.topics().createNonPartitionedTopic(topic); + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .topic(topic) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener( + (MessageListener) + (consumer, msg) -> { + acknowledgeMessage(consumer, msg); + latch.countDown(); + }) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = + testing.runWithSpan( + "parent", + () -> producer.newMessage().value(msg).property("test-message-header", "test").send()); + + latch.await(1, TimeUnit.MINUTES); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic, msgId.toString(), true)), + span -> + span.hasName(topic + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic, msgId.toString(), true)), + span -> + span.hasName(topic + " process") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + processAttributes(topic, msgId.toString(), true)))); + } + + @Test + void testSendPartitionedTopic() throws Exception { + String topic = "persistent://public/default/testSendPartitionedTopic"; + admin.topics().createPartitionedTopic(topic, 1); + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + "-partition-0 publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic + "-partition-0", msgId.toString(), false)))); + } + + @Test + void testConsumePartitionedTopic() throws Exception { + String topic = "persistent://public/default/testConsumePartitionedTopic"; + admin.topics().createPartitionedTopic(topic, 1); + CountDownLatch latch = new CountDownLatch(1); + + consumer = + client + .newConsumer(Schema.STRING) + .subscriptionName("test_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .topic(topic) + .messageListener( + (MessageListener) + (consumer, msg) -> { + acknowledgeMessage(consumer, msg); + latch.countDown(); + }) + .subscribe(); + + producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create(); + + String msg = "test"; + MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg)); + + latch.await(1, TimeUnit.MINUTES); + + testing.waitAndAssertTraces( + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic + "-partition-0 publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic + "-partition-0", msgId.toString(), false)), + span -> + span.hasName(topic + "-partition-0 receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic + "-partition-0", msgId.toString(), false)), + span -> + span.hasName(topic + "-partition-0 process") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + processAttributes(topic + "-partition-0", msgId.toString(), false)))); + } + + @Test + void testConsumeMultiTopics() throws Exception { + String topicNamePrefix = "persistent://public/default/testConsumeMulti_"; + String topic1 = topicNamePrefix + "1"; + String topic2 = topicNamePrefix + "2"; + CountDownLatch latch = new CountDownLatch(2); + producer = client.newProducer(Schema.STRING).topic(topic1).enableBatching(false).create(); + producer2 = client.newProducer(Schema.STRING).topic(topic2).enableBatching(false).create(); + + MessageId msgId1 = testing.runWithSpan("parent1", () -> producer.send("test1")); + MessageId msgId2 = testing.runWithSpan("parent2", () -> producer2.send("test2")); + + consumer = + client + .newConsumer(Schema.STRING) + .topic(topic2, topic1) + .subscriptionName("test_sub") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .messageListener( + (MessageListener) + (consumer, msg) -> { + acknowledgeMessage(consumer, msg); + latch.countDown(); + }) + .subscribe(); + + latch.await(1, TimeUnit.MINUTES); + + testing.waitAndAssertSortedTraces( + orderByRootSpanName("parent1", "parent2"), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic1 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic1, msgId1.toString(), false)), + span -> + span.hasName(topic1 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic1, msgId1.toString(), false)), + span -> + span.hasName(topic1 + " process") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + processAttributes(topic1, msgId1.toString(), false))), + trace -> + trace.hasSpansSatisfyingExactly( + span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(), + span -> + span.hasName(topic2 + " publish") + .hasKind(SpanKind.PRODUCER) + .hasParent(trace.getSpan(0)) + .hasAttributesSatisfyingExactly( + sendAttributes(topic2, msgId2.toString(), false)), + span -> + span.hasName(topic2 + " receive") + .hasKind(SpanKind.CONSUMER) + .hasParent(trace.getSpan(1)) + .hasAttributesSatisfyingExactly( + receiveAttributes(topic2, msgId2.toString(), false)), + span -> + span.hasName(topic2 + " process") + .hasKind(SpanKind.INTERNAL) + .hasParent(trace.getSpan(2)) + .hasAttributesSatisfyingExactly( + processAttributes(topic2, msgId2.toString(), false)))); + } + + private static List sendAttributes( + String destination, String messageId, boolean testHeaders) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"), + equalTo(SemanticAttributes.SERVER_ADDRESS, brokerHost), + equalTo(SemanticAttributes.SERVER_PORT, brokerPort), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative), + equalTo(MESSAGE_TYPE, "normal"))); + if (testHeaders) { + assertions.add( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + return assertions; + } + + private static List receiveAttributes( + String destination, String messageId, boolean testHeaders) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"), + equalTo(SemanticAttributes.SERVER_ADDRESS, brokerHost), + equalTo(SemanticAttributes.SERVER_PORT, brokerPort), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative))); + if (testHeaders) { + assertions.add( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + return assertions; + } + + private static List processAttributes( + String destination, String messageId, boolean testHeaders) { + List assertions = + new ArrayList<>( + Arrays.asList( + equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"), + equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination), + equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"), + equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId), + satisfies( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, + AbstractLongAssert::isNotNegative))); + if (testHeaders) { + assertions.add( + equalTo( + AttributeKey.stringArrayKey("messaging.header.test_message_header"), + Collections.singletonList("test"))); + } + return assertions; + } + + private static void acknowledgeMessage(Consumer consumer, Message message) { + try { + consumer.acknowledge(message); + } catch (PulsarClientException exception) { + throw new RuntimeException(exception); + } + } + + private static void acknowledgeMessages(Consumer consumer, Messages messages) { + try { + consumer.acknowledge(messages); + } catch (PulsarClientException exception) { + throw new RuntimeException(exception); + } + } +}