diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java index 34dea1a6d1..3a1cc09db9 100644 --- a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/telemetry/PulsarSingletons.java @@ -24,7 +24,8 @@ import org.apache.pulsar.client.api.Consumer; import org.apache.pulsar.client.api.Message; public final class PulsarSingletons { - private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pulsar-client-2.8"; + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.apache-pulsar-2.8"; + private static final OpenTelemetry TELEMETRY = GlobalOpenTelemetry.get(); private static final TextMapPropagator PROPAGATOR = TELEMETRY.getPropagators().getTextMapPropagator(); diff --git a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy index 013e2a7e4c..351957ed35 100644 --- a/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy +++ b/instrumentation/apache-pulsar/apache-pulsar-2.8/javaagent/src/test/groovy/io/opentelemetry/javaagent/instrumentation/pulsar/v2_8/PulsarClientTest.groovy @@ -6,7 +6,6 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8 import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification -import io.opentelemetry.instrumentation.test.asserts.SpanAssert import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import org.apache.pulsar.client.admin.PulsarAdmin import org.apache.pulsar.client.api.Consumer @@ -16,11 +15,14 @@ import org.apache.pulsar.client.api.Producer import org.apache.pulsar.client.api.PulsarClient import org.apache.pulsar.client.api.Schema import org.apache.pulsar.client.api.SubscriptionInitialPosition -import org.junit.Assert +import org.slf4j.Logger +import org.slf4j.LoggerFactory import org.testcontainers.containers.PulsarContainer +import org.testcontainers.containers.output.Slf4jLogConsumer import org.testcontainers.utility.DockerImageName import spock.lang.Shared +import java.time.Duration import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit @@ -29,6 +31,7 @@ import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.PRODUCER class PulsarClientTest extends AgentInstrumentationSpecification { + private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest) private static final DockerImageName DEFAULT_IMAGE_NAME = DockerImageName.parse("apachepulsar/pulsar:2.8.0") @@ -52,6 +55,9 @@ class PulsarClientTest extends AgentInstrumentationSpecification { @Override def setupSpec() { pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME) + .withEnv("PULSAR_MEM", "-Xmx128m") + .withLogConsumer(new Slf4jLogConsumer(logger)) + .withStartupTimeout(Duration.ofMinutes(2)) pulsar.start() brokerUrl = pulsar.pulsarBrokerUrl @@ -71,59 +77,47 @@ class PulsarClientTest extends AgentInstrumentationSpecification { def "test send non-partitioned topic"() { setup: - def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + def topic = "persistent://public/default/testSendNonPartitionedTopic" admin.topics().createNonPartitionedTopic(topic) producer = client.newProducer(Schema.STRING).topic(topic) .enableBatching(false).create() - String msg = UUID.randomUUID().toString() - - def msgId - runWithSpan("parent") { - msgId = producer.send(msg) + when: + String msg = "test" + def msgId = runWithSpan("parent") { + producer.send(msg) } - def traces = waitForTraces(1) - Assert.assertEquals(traces.size(), 1) - def spans = traces[0] - Assert.assertEquals(spans.size(), 2) - def parent = spans.find { - it0 -> - it0.name.equalsIgnoreCase("parent") - } - def producer = spans.find { - it0 -> - it0.name.equalsIgnoreCase("PRODUCER/SEND") - } - Assert.assertNotNull(parent) - Assert.assertNotNull(producer) - - SpanAssert.assertSpan(parent) { - name("parent") - kind(INTERNAL) - hasNoParent() - } - - SpanAssert.assertSpan(producer) { - name("PRODUCER/SEND") - kind(PRODUCER) - childOf parent - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl - "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + then: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "PRODUCER/SEND" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + } + } } } } def "test consume non-partitioned topic"() { setup: - def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + def topic = "persistent://public/default/testConsumeNonPartitionedTopic" def latch = new CountDownLatch(1) admin.topics().createNonPartitionedTopic(topic) consumer = client.newConsumer(Schema.STRING) @@ -144,145 +138,113 @@ class PulsarClientTest extends AgentInstrumentationSpecification { .enableBatching(false) .create() - def msgId - def msg = UUID.randomUUID().toString() - runWithSpan("parent") { - msgId = producer.send(msg) + when: + def msg = "test" + def msgId = runWithSpan("parent") { + producer.send(msg) } latch.await(1, TimeUnit.MINUTES) - // Wait until all the spans finished. - Thread.sleep(TimeUnit.SECONDS.toMillis(20)) - def traces = waitForTraces(1) - def spans = traces[0] - Assert.assertEquals(spans.size(), 4) - def parent = spans.find { - it0 -> - it0.name.equalsIgnoreCase("parent") - } - def send = spans.find { - it0 -> - it0.name.equalsIgnoreCase("PRODUCER/SEND") - } - def receive = spans.find { - it0 -> - it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") - } - - def process = spans.find { - it0 -> - it0.name.equalsIgnoreCase("CONSUMER/PROCESS") - } - - SpanAssert.assertSpan(parent) { - name("parent") - kind(INTERNAL) - hasNoParent() - } - - SpanAssert.assertSpan(send) { - name("PRODUCER/SEND") - kind(PRODUCER) - childOf parent - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" - } - } - - SpanAssert.assertSpan(receive) { - name("CONSUMER/RECEIVE") - kind(CONSUMER) - childOf(send) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - } - } - - SpanAssert.assertSpan(process) { - name("CONSUMER/PROCESS") - kind(INTERNAL) - childOf(receive) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_OPERATION" "process" - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + then: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "PRODUCER/SEND" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } + } + span(2) { + name "CONSUMER/RECEIVE" + kind CONSUMER + childOf span(1) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + span(3) { + name "CONSUMER/PROCESS" + kind INTERNAL + childOf span(2) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_OPERATION" "process" + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + } + } } } } - def "test send partitioned topic"() { setup: - def topic = "persistent://public/default/testPartitionedTopic_" + UUID.randomUUID() + def topic = "persistent://public/default/testSendPartitionedTopic" admin.topics().createPartitionedTopic(topic, 2) producer = client.newProducer(Schema.STRING).topic(topic) .enableBatching(false).create() - String msg = UUID.randomUUID().toString() - - def msgId - runWithSpan("parent") { - msgId = producer.send(msg) + when: + String msg = "test" + def msgId = runWithSpan("parent") { + producer.send(msg) } - def traces = waitForTraces(1) - def spans = traces[0] - Assert.assertEquals(spans.size(), 2) - - def parent = spans.find { - it0 -> - it0.name.equalsIgnoreCase("parent") - } - def send = spans.find { - it0 -> - it0.name.equalsIgnoreCase("PRODUCER/SEND") - } - - SpanAssert.assertSpan(parent) { - name("parent") - kind(INTERNAL) - hasNoParent() - } - - SpanAssert.assertSpan(send) { - name("PRODUCER/SEND") - kind(PRODUCER) - childOf parent - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { - t -> - return t.toString().contains(topic) + then: + assertTraces(1) { + trace(0, 2) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() + } + span(1) { + name "PRODUCER/SEND" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { + t -> + return t.toString().contains(topic) + } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } } - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" } } } def "test consume partitioned topic"() { setup: - def topic = "persistent://public/default/testPartitionedTopic_" + UUID.randomUUID() + def topic = "persistent://public/default/testConsumePartitionedTopic" admin.topics().createPartitionedTopic(topic, 2) def latch = new CountDownLatch(1) @@ -304,105 +266,73 @@ class PulsarClientTest extends AgentInstrumentationSpecification { .enableBatching(false) .create() - def msgId - def msg = UUID.randomUUID().toString() - runWithSpan("parent") { - msgId = producer.send(msg) + when: + def msg = "test" + def msgId = runWithSpan("parent") { + producer.send(msg) } latch.await(1, TimeUnit.MINUTES) - // Wait until all the spans finished. - Thread.sleep(TimeUnit.SECONDS.toMillis(20)) - def traces = waitForTraces(1) - Assert.assertEquals(traces.size(), 1) - def spans = traces[0] - Assert.assertEquals(spans.size(), 4) - - def parent = spans.find { - it0 -> - it0.name.equalsIgnoreCase("parent") - } - def send = spans.find { - it0 -> - it0.name.equalsIgnoreCase("PRODUCER/SEND") - } - def receive = spans.find { - it0 -> - it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") - } - - def process = spans.find { - it0 -> - it0.name.equalsIgnoreCase("CONSUMER/PROCESS") - } - - SpanAssert.assertSpan(parent) { - name("parent") - kind(INTERNAL) - hasNoParent() - } - - SpanAssert.assertSpan(send) { - name("PRODUCER/SEND") - kind(PRODUCER) - childOf parent - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { - v -> - return v.toString().contains(topic) + then: + assertTraces(1) { + trace(0, 4) { + span(0) { + name "parent" + kind INTERNAL + hasNoParent() } - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" - } - } - - SpanAssert.assertSpan(receive) { - name("CONSUMER/RECEIVE") - kind(CONSUMER) - childOf(send) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { - v -> - return v.toString().contains(topic) + span(1) { + name "PRODUCER/SEND" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topic) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } } - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_OPERATION" "receive" - } - } - - SpanAssert.assertSpan(process) { - name("CONSUMER/PROCESS") - kind(INTERNAL) - childOf(receive) - attributes { - "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" - "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" - "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { - v -> - return v.toString().contains(topic) + span(2) { + name "CONSUMER/RECEIVE" + kind CONSUMER + childOf span(1) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topic) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + span(3) { + name "CONSUMER/PROCESS" + kind INTERNAL + childOf span(2) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topic) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } } - "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() - "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long - "$SemanticAttributes.MESSAGING_OPERATION" "process" } } } - def "test consume multi-topics"() { setup: - def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() - def topic1 = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() + def topicNamePrefix = "persistent://public/default/testConsumeMulti_" + def topic = topicNamePrefix + "1" + def topic1 = topicNamePrefix + "2" def latch = new CountDownLatch(2) producer = client.newProducer(Schema.STRING) @@ -414,9 +344,12 @@ class PulsarClientTest extends AgentInstrumentationSpecification { .enableBatching(false) .create() - runWithSpan("parent") { - producer.send(UUID.randomUUID().toString()) - producer1.send(UUID.randomUUID().toString()) + when: + runWithSpan("parent1") { + producer.send("test1") + } + runWithSpan("parent2") { + producer1.send("test2") } consumer = client.newConsumer(Schema.STRING) @@ -433,74 +366,74 @@ class PulsarClientTest extends AgentInstrumentationSpecification { .subscribe() latch.await(1, TimeUnit.MINUTES) - // Wait until all the spans finished. - Thread.sleep(TimeUnit.SECONDS.toMillis(20)) - def traces = waitForTraces(1) - Assert.assertEquals(traces.size(), 1) - def spans = traces[0] - Assert.assertEquals(spans.size(), 7) - - def parent = spans.find { - it0 -> - it0.name.equalsIgnoreCase("parent") - } - - SpanAssert.assertSpan(parent) { - hasNoParent() - kind(INTERNAL) - } - - - def sendSpans = spans.findAll { - it0 -> - it0.name.equalsIgnoreCase("PRODUCER/SEND") - } - - sendSpans.forEach { - it0 -> - SpanAssert.assertSpan(it0) { - kind(PRODUCER) - childOf(parent) - } - } - - def receiveSpans = spans.findAll { - it0 -> - it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") - } - - def processSpans = spans.findAll { - it0 -> - it0.name.equalsIgnoreCase("CONSUMER/PROCESS") - } - - receiveSpans.forEach { - it0 -> - def parentSpanId = it0.getParentSpanId() - def parent0 = sendSpans.find { - v -> - (v.spanId == parentSpanId) - } - - SpanAssert.assertSpan(it0) { - kind(CONSUMER) - childOf(parent0) - } - } - - processSpans.forEach { - it0 -> - def parentSpanId = it0.getParentSpanId() - def parent0 = receiveSpans.find { - v -> - (v.spanId == parentSpanId) - } - - SpanAssert.assertSpan(it0) { - kind(INTERNAL) - childOf(parent0) + then: + assertTraces(2) { + traces.sort(orderByRootSpanName("parent1", "parent2")) + for (int i in 1..2) { + trace(i - 1, 4) { + span(0) { + name "parent" + i + kind INTERNAL + hasNoParent() + } + span(1) { + name "PRODUCER/SEND" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } + } + span(1) { + name "PRODUCER/SEND" + kind PRODUCER + childOf span(0) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGE_TYPE" "NORMAL" + } + } + span(2) { + name "CONSUMER/RECEIVE" + kind CONSUMER + childOf span(1) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "receive" + } + } + span(3) { + name "CONSUMER/PROCESS" + kind INTERNAL + childOf span(2) + attributes { + "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" + "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" + "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) } + "$SemanticAttributes.MESSAGING_MESSAGE_ID" String + "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long + "$SemanticAttributes.MESSAGING_OPERATION" "process" + } + } } + } } } }