Convert pulsar 2.8 groovy test to Java (#10176)

Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
hadesy 2024-01-24 15:13:04 +08:00 committed by GitHub
parent 37cb7c61fd
commit 7d544f1476
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 699 additions and 671 deletions

View File

@ -1,671 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.TraceAssert
import io.opentelemetry.semconv.SemanticAttributes
import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.MessageListener
import org.apache.pulsar.client.api.Messages
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionInitialPosition
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.PulsarContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.utility.DockerImageName
import spock.lang.Shared
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class PulsarClientTest extends AgentInstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest)
private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("apachepulsar/pulsar:2.8.0")
@Shared
private PulsarContainer pulsar
@Shared
private PulsarClient client
@Shared
private PulsarAdmin admin
@Shared
private Producer<String> producer
@Shared
private Consumer<String> consumer
@Shared
private Producer<String> producer2
@Shared
private String brokerHost
@Shared
private int brokerPort
@Override
def setupSpec() {
pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME)
.withEnv("PULSAR_MEM", "-Xmx128m")
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofMinutes(2))
pulsar.start()
brokerHost = pulsar.host
brokerPort = pulsar.getMappedPort(6650)
client = PulsarClient.builder().serviceUrl(pulsar.pulsarBrokerUrl).build()
admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.httpServiceUrl).build()
}
@Override
def cleanupSpec() {
producer?.close()
consumer?.close()
producer2?.close()
client?.close()
admin?.close()
pulsar.close()
}
def "test send non-partitioned topic"() {
setup:
def topic = "persistent://public/default/testSendNonPartitionedTopic"
admin.topics().createNonPartitionedTopic(topic)
producer =
client.newProducer(Schema.STRING).topic(topic)
.enableBatching(false).create()
when:
String msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
}
}
}
def "test consume non-partitioned topic"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopic"
def latch = new CountDownLatch(1)
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener(new MessageListener<String>() {
@Override
void received(Consumer<String> consumer, Message<String> msg) {
consumer.acknowledge(msg)
latch.countDown()
}
})
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
latch.await(1, TimeUnit.MINUTES)
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
receiveSpan(it, 2, span(1), topic, msgId)
processSpan(it, 3, span(2), topic, msgId)
}
}
}
def "test consume non-partitioned topic using receive"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceive"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
def receivedMsg = consumer.receive()
consumer.acknowledge(receivedMsg)
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
receiveSpan(it, 2, span(1), topic, msgId)
}
}
}
def "test consume non-partitioned topic using receiveAsync"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveAsync"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
CompletableFuture<Message<String>> result = consumer.receiveAsync().whenComplete { receivedMsg, throwable ->
runWithSpan("callback") {
consumer.acknowledge(receivedMsg)
}
}
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
result.get(1, TimeUnit.MINUTES)
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
receiveSpan(it, 2, span(1), topic, msgId)
span(3) {
name "callback"
kind INTERNAL
childOf span(2)
attributes {
}
}
}
}
}
def "test consume non-partitioned topic using receive with timeout"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveWithTimeout"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
def receivedMsg = consumer.receive(1, TimeUnit.MINUTES)
consumer.acknowledge(receivedMsg)
then:
assertTraces(1) {
trace(0, 3) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
receiveSpan(it, 2, span(1), topic, msgId)
}
}
}
def "test consume non-partitioned topic using batchReceive"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
runWithSpan("receive-parent") {
def receivedMsg = consumer.batchReceive()
consumer.acknowledge(receivedMsg)
}
then:
def producer
assertTraces(2) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
producer = span(1)
}
trace(1, 2) {
span(0) {
name "receive-parent"
kind INTERNAL
hasNoParent()
}
receiveSpan(it, 1, span(0), topic, null, producer)
}
}
}
def "test consume non-partitioned topic using batchReceiveAsync"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
CompletableFuture<Messages<String>> result = runWithSpan("receive-parent") {
consumer.batchReceiveAsync().whenComplete { receivedMsg, throwable ->
runWithSpan("callback") {
consumer.acknowledge(receivedMsg)
}
}
}
result.get(1, TimeUnit.MINUTES).size() == 1
then:
def producer
assertTraces(2) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
producer = span(1)
}
trace(1, 3) {
span(0) {
name "receive-parent"
kind INTERNAL
hasNoParent()
}
receiveSpan(it, 1, span(0), topic, null, producer)
span(2) {
name "callback"
kind INTERNAL
childOf span(1)
attributes {
}
}
}
}
}
def "capture message header as span attribute"() {
setup:
def topic = "persistent://public/default/testCaptureMessageHeaderTopic"
def latch = new CountDownLatch(1)
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener(new MessageListener<String>() {
@Override
void received(Consumer<String> consumer, Message<String> msg) {
consumer.acknowledge(msg)
latch.countDown()
}
})
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.newMessage().value(msg).property("test-message-header", "test").send()
}
latch.await(1, TimeUnit.MINUTES)
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId, true)
receiveSpan(it, 2, span(1), topic, msgId, null, true)
processSpan(it, 3, span(2), topic, msgId, true)
}
}
}
def "test send partitioned topic"() {
setup:
def topic = "persistent://public/default/testSendPartitionedTopic"
admin.topics().createPartitionedTopic(topic, 2)
producer =
client.newProducer(Schema.STRING).topic(topic)
.enableBatching(false).create()
when:
String msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
then:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, ~/${topic}-partition-.*publish/, { it.startsWith(topic) }, msgId)
}
}
}
def "test consume partitioned topic"() {
setup:
def topic = "persistent://public/default/testConsumePartitionedTopic"
admin.topics().createPartitionedTopic(topic, 2)
def latch = new CountDownLatch(1)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.messageListener(new MessageListener<String>() {
@Override
void received(Consumer<String> consumer, Message<String> msg) {
consumer.acknowledge(msg)
latch.countDown()
}
})
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
latch.await(1, TimeUnit.MINUTES)
then:
assertTraces(1) {
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, ~/${topic}-partition-.*publish/, { it.startsWith(topic) }, msgId)
receiveSpan(it, 2, span(1), topic, ~/${topic}-partition-.*receive/, { it.startsWith(topic) }, msgId)
processSpan(it, 3, span(2), topic, ~/${topic}-partition-.*process/, { it.startsWith(topic) }, msgId)
}
}
}
def "test consume multi-topics"() {
setup:
def topicNamePrefix = "persistent://public/default/testConsumeMulti_"
def topic1 = topicNamePrefix + "1"
def topic2 = topicNamePrefix + "2"
def latch = new CountDownLatch(2)
producer = client.newProducer(Schema.STRING)
.topic(topic1)
.enableBatching(false)
.create()
producer2 = client.newProducer(Schema.STRING)
.topic(topic2)
.enableBatching(false)
.create()
when:
runWithSpan("parent1") {
producer.send("test1")
}
runWithSpan("parent2") {
producer2.send("test2")
}
consumer = client.newConsumer(Schema.STRING)
.topic(topic2, topic1)
.subscriptionName("test_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener(new MessageListener<String>() {
@Override
void received(Consumer<String> consumer, Message<String> msg) {
consumer.acknowledge(msg)
latch.countDown()
}
})
.subscribe()
latch.await(1, TimeUnit.MINUTES)
then:
assertTraces(2) {
traces.sort(orderByRootSpanName("parent1", "parent2"))
for (int i in 1..2) {
def topic = i == 1 ? topic1 : topic2
trace(i - 1, 4) {
span(0) {
name "parent" + i
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, null, { it.startsWith(topicNamePrefix) }, String)
receiveSpan(it, 2, span(1), topic, null, { it.startsWith(topicNamePrefix) }, String)
processSpan(it, 3, span(2), topic, null, { it.startsWith(topicNamePrefix) }, String)
}
}
}
}
def producerSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, boolean headers = false) {
producerSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, headers)
}
def producerSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, boolean headers = false) {
trace.span(index) {
if (namePattern != null) {
name namePattern
} else {
name "$topic publish"
}
kind PRODUCER
childOf parentSpan
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.SERVER_ADDRESS" brokerHost
"$SemanticAttributes.SERVER_PORT" brokerPort
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
}
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"messaging.pulsar.message.type" "normal"
if (headers) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
}
def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, Object linkedSpan = null, boolean headers = false) {
receiveSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, linkedSpan, headers)
}
def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, Object linkedSpan = null, boolean headers = false) {
trace.span(index) {
if (namePattern != null) {
name namePattern
} else {
name "$topic receive"
}
kind CONSUMER
childOf parentSpan
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink linkedSpan
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.SERVER_ADDRESS" brokerHost
"$SemanticAttributes.SERVER_PORT" brokerPort
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
}
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
if (headers) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
}
def processSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, boolean headers = false) {
processSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, headers)
}
def processSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, boolean headers = false) {
trace.span(index) {
if (namePattern != null) {
name namePattern
} else {
name "$topic process"
}
kind INTERNAL
childOf parentSpan
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
}
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
if (headers) {
"messaging.header.test_message_header" { it == ["test"] }
}
}
}
}
}

View File

@ -0,0 +1,699 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
import static org.assertj.core.api.Assertions.assertThat;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
import io.opentelemetry.sdk.trace.data.LinkData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.semconv.SemanticAttributes;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.assertj.core.api.AbstractLongAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.PulsarContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.utility.DockerImageName;
class PulsarClientTest {
private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest.class);
private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("apachepulsar/pulsar:2.8.0");
@RegisterExtension
static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
private static PulsarContainer pulsar;
private static PulsarClient client;
private static PulsarAdmin admin;
private static Producer<String> producer;
private static Consumer<String> consumer;
private static Producer<String> producer2;
private static String brokerHost;
private static int brokerPort;
private static final AttributeKey<String> MESSAGE_TYPE =
AttributeKey.stringKey("messaging.pulsar.message.type");
@BeforeAll
static void beforeAll() throws PulsarClientException {
pulsar =
new PulsarContainer(DEFAULT_IMAGE_NAME)
.withEnv("PULSAR_MEM", "-Xmx128m")
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofMinutes(2));
pulsar.start();
brokerHost = pulsar.getHost();
brokerPort = pulsar.getMappedPort(6650);
client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build();
admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
}
@AfterAll
static void afterAll() throws PulsarClientException {
if (producer != null) {
producer.close();
}
if (consumer != null) {
consumer.close();
}
if (producer2 != null) {
producer2.close();
}
if (client != null) {
client.close();
}
if (admin != null) {
admin.close();
}
pulsar.close();
}
@Test
void testSendNonPartitionedTopic() throws Exception {
String topic = "persistent://public/default/testSendNonPartitionedTopic";
admin.topics().createNonPartitionedTopic(topic);
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false))));
}
@Test
void testConsumeNonPartitionedTopic() throws Exception {
String topic = "persistent://public/default/testConsumeNonPartitionedTopic";
CountDownLatch latch = new CountDownLatch(1);
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener(
(MessageListener<String>)
(consumer, msg) -> {
acknowledgeMessage(consumer, msg);
latch.countDown();
})
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
latch.await(1, TimeUnit.MINUTES);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false)),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic, msgId.toString(), false)),
span ->
span.hasName(topic + " process")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(
processAttributes(topic, msgId.toString(), false))));
}
@Test
void testConsumeNonPartitionedTopicUsingReceive() throws Exception {
String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceive";
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
Message<String> receivedMsg = consumer.receive();
consumer.acknowledge(receivedMsg);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false)),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic, msgId.toString(), false))));
}
@Test
void testConsumeNonPartitionedTopicUsingReceiveAsync() throws Exception {
String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveAsync";
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
CompletableFuture<Message<String>> result =
consumer
.receiveAsync()
.whenComplete(
(message, throwable) -> {
if (message != null) {
testing.runWithSpan("callback", () -> acknowledgeMessage(consumer, message));
}
});
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
result.get(1, TimeUnit.MINUTES);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false)),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic, msgId.toString(), false)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))));
}
@Test
void testConsumeNonPartitionedTopicUsingReceiveWithTimeout() throws Exception {
String topic =
"persistent://public/default/testConsumeNonPartitionedTopicCallReceiveWithTimeout";
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
Message<String> receivedMsg = consumer.receive(1, TimeUnit.MINUTES);
consumer.acknowledge(receivedMsg);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false)),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic, msgId.toString(), false))));
}
@Test
void testConsumeNonPartitionedTopicUsingBatchReceive() throws Exception {
String topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive";
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
testing.runWithSpan(
"receive-parent",
() -> {
Messages<String> receivedMsg = consumer.batchReceive();
consumer.acknowledge(receivedMsg);
});
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertTraces(
trace -> {
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false)));
producerSpan.set(trace.getSpan(1));
},
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(receiveAttributes(topic, null, false))));
}
@Test
void testConsumeNonPartitionedTopicUsingBatchReceiveAsync() throws Exception {
String topic =
"persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync";
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
CompletableFuture<Messages<String>> result =
testing.runWithSpan(
"receive-parent",
() ->
consumer
.batchReceiveAsync()
.whenComplete(
(messages, throwable) -> {
if (messages != null) {
testing.runWithSpan(
"callback", () -> acknowledgeMessages(consumer, messages));
}
}));
assertThat(result.get(1, TimeUnit.MINUTES).size()).isEqualTo(1);
AtomicReference<SpanData> producerSpan = new AtomicReference<>();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span -> {
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), false));
producerSpan.set(trace.getSpan(1));
}),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("receive-parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(0))
.hasLinks(LinkData.create(producerSpan.get().getSpanContext()))
.hasAttributesSatisfyingExactly(receiveAttributes(topic, null, false)),
span ->
span.hasName("callback")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(1))));
}
@Test
void captureMessageHeaderAsSpanAttribute() throws Exception {
String topic = "persistent://public/default/testCaptureMessageHeaderTopic";
CountDownLatch latch = new CountDownLatch(1);
admin.topics().createNonPartitionedTopic(topic);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener(
(MessageListener<String>)
(consumer, msg) -> {
acknowledgeMessage(consumer, msg);
latch.countDown();
})
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId =
testing.runWithSpan(
"parent",
() -> producer.newMessage().value(msg).property("test-message-header", "test").send());
latch.await(1, TimeUnit.MINUTES);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic, msgId.toString(), true)),
span ->
span.hasName(topic + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic, msgId.toString(), true)),
span ->
span.hasName(topic + " process")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(
processAttributes(topic, msgId.toString(), true))));
}
@Test
void testSendPartitionedTopic() throws Exception {
String topic = "persistent://public/default/testSendPartitionedTopic";
admin.topics().createPartitionedTopic(topic, 1);
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + "-partition-0 publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic + "-partition-0", msgId.toString(), false))));
}
@Test
void testConsumePartitionedTopic() throws Exception {
String topic = "persistent://public/default/testConsumePartitionedTopic";
admin.topics().createPartitionedTopic(topic, 1);
CountDownLatch latch = new CountDownLatch(1);
consumer =
client
.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.topic(topic)
.messageListener(
(MessageListener<String>)
(consumer, msg) -> {
acknowledgeMessage(consumer, msg);
latch.countDown();
})
.subscribe();
producer = client.newProducer(Schema.STRING).topic(topic).enableBatching(false).create();
String msg = "test";
MessageId msgId = testing.runWithSpan("parent", () -> producer.send(msg));
latch.await(1, TimeUnit.MINUTES);
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + "-partition-0 publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic + "-partition-0", msgId.toString(), false)),
span ->
span.hasName(topic + "-partition-0 receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic + "-partition-0", msgId.toString(), false)),
span ->
span.hasName(topic + "-partition-0 process")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(
processAttributes(topic + "-partition-0", msgId.toString(), false))));
}
@Test
void testConsumeMultiTopics() throws Exception {
String topicNamePrefix = "persistent://public/default/testConsumeMulti_";
String topic1 = topicNamePrefix + "1";
String topic2 = topicNamePrefix + "2";
CountDownLatch latch = new CountDownLatch(2);
producer = client.newProducer(Schema.STRING).topic(topic1).enableBatching(false).create();
producer2 = client.newProducer(Schema.STRING).topic(topic2).enableBatching(false).create();
MessageId msgId1 = testing.runWithSpan("parent1", () -> producer.send("test1"));
MessageId msgId2 = testing.runWithSpan("parent2", () -> producer2.send("test2"));
consumer =
client
.newConsumer(Schema.STRING)
.topic(topic2, topic1)
.subscriptionName("test_sub")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.messageListener(
(MessageListener<String>)
(consumer, msg) -> {
acknowledgeMessage(consumer, msg);
latch.countDown();
})
.subscribe();
latch.await(1, TimeUnit.MINUTES);
testing.waitAndAssertSortedTraces(
orderByRootSpanName("parent1", "parent2"),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic1 + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic1, msgId1.toString(), false)),
span ->
span.hasName(topic1 + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic1, msgId1.toString(), false)),
span ->
span.hasName(topic1 + " process")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(
processAttributes(topic1, msgId1.toString(), false))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent2").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic2 + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
sendAttributes(topic2, msgId2.toString(), false)),
span ->
span.hasName(topic2 + " receive")
.hasKind(SpanKind.CONSUMER)
.hasParent(trace.getSpan(1))
.hasAttributesSatisfyingExactly(
receiveAttributes(topic2, msgId2.toString(), false)),
span ->
span.hasName(topic2 + " process")
.hasKind(SpanKind.INTERNAL)
.hasParent(trace.getSpan(2))
.hasAttributesSatisfyingExactly(
processAttributes(topic2, msgId2.toString(), false))));
}
private static List<AttributeAssertion> sendAttributes(
String destination, String messageId, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"),
equalTo(SemanticAttributes.SERVER_ADDRESS, brokerHost),
equalTo(SemanticAttributes.SERVER_PORT, brokerPort),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative),
equalTo(MESSAGE_TYPE, "normal")));
if (testHeaders) {
assertions.add(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
return assertions;
}
private static List<AttributeAssertion> receiveAttributes(
String destination, String messageId, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"),
equalTo(SemanticAttributes.SERVER_ADDRESS, brokerHost),
equalTo(SemanticAttributes.SERVER_PORT, brokerPort),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative)));
if (testHeaders) {
assertions.add(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
return assertions;
}
private static List<AttributeAssertion> processAttributes(
String destination, String messageId, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "pulsar"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destination),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "process"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
AbstractLongAssert::isNotNegative)));
if (testHeaders) {
assertions.add(
equalTo(
AttributeKey.stringArrayKey("messaging.header.test_message_header"),
Collections.singletonList("test")));
}
return assertions;
}
private static void acknowledgeMessage(Consumer<String> consumer, Message<String> message) {
try {
consumer.acknowledge(message);
} catch (PulsarClientException exception) {
throw new RuntimeException(exception);
}
}
private static void acknowledgeMessages(Consumer<String> consumer, Messages<String> messages) {
try {
consumer.acknowledge(messages);
} catch (PulsarClientException exception) {
throw new RuntimeException(exception);
}
}
}