Rework pulsar tests (#8000)

This commit is contained in:
Lauri Tulmin 2023-03-08 07:48:59 +02:00 committed by GitHub
parent a51535d08e
commit b394bab2ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 246 additions and 312 deletions

View File

@ -24,7 +24,8 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message; import org.apache.pulsar.client.api.Message;
public final class PulsarSingletons { public final class PulsarSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pulsar-client-2.8"; private static final String INSTRUMENTATION_NAME = "io.opentelemetry.apache-pulsar-2.8";
private static final OpenTelemetry TELEMETRY = GlobalOpenTelemetry.get(); private static final OpenTelemetry TELEMETRY = GlobalOpenTelemetry.get();
private static final TextMapPropagator PROPAGATOR = private static final TextMapPropagator PROPAGATOR =
TELEMETRY.getPropagators().getTextMapPropagator(); TELEMETRY.getPropagators().getTextMapPropagator();

View File

@ -6,7 +6,6 @@
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8 package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import io.opentelemetry.instrumentation.test.asserts.SpanAssert
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.pulsar.client.admin.PulsarAdmin import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.Consumer import org.apache.pulsar.client.api.Consumer
@ -16,11 +15,14 @@ import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema import org.apache.pulsar.client.api.Schema
import org.apache.pulsar.client.api.SubscriptionInitialPosition import org.apache.pulsar.client.api.SubscriptionInitialPosition
import org.junit.Assert import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.testcontainers.containers.PulsarContainer import org.testcontainers.containers.PulsarContainer
import org.testcontainers.containers.output.Slf4jLogConsumer
import org.testcontainers.utility.DockerImageName import org.testcontainers.utility.DockerImageName
import spock.lang.Shared import spock.lang.Shared
import java.time.Duration
import java.util.concurrent.CountDownLatch import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
@ -29,6 +31,7 @@ import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class PulsarClientTest extends AgentInstrumentationSpecification { class PulsarClientTest extends AgentInstrumentationSpecification {
private static final Logger logger = LoggerFactory.getLogger(PulsarClientTest)
private static final DockerImageName DEFAULT_IMAGE_NAME = private static final DockerImageName DEFAULT_IMAGE_NAME =
DockerImageName.parse("apachepulsar/pulsar:2.8.0") DockerImageName.parse("apachepulsar/pulsar:2.8.0")
@ -52,6 +55,9 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
@Override @Override
def setupSpec() { def setupSpec() {
pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME) pulsar = new PulsarContainer(DEFAULT_IMAGE_NAME)
.withEnv("PULSAR_MEM", "-Xmx128m")
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofMinutes(2))
pulsar.start() pulsar.start()
brokerUrl = pulsar.pulsarBrokerUrl brokerUrl = pulsar.pulsarBrokerUrl
@ -71,59 +77,47 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
def "test send non-partitioned topic"() { def "test send non-partitioned topic"() {
setup: setup:
def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() def topic = "persistent://public/default/testSendNonPartitionedTopic"
admin.topics().createNonPartitionedTopic(topic) admin.topics().createNonPartitionedTopic(topic)
producer = producer =
client.newProducer(Schema.STRING).topic(topic) client.newProducer(Schema.STRING).topic(topic)
.enableBatching(false).create() .enableBatching(false).create()
String msg = UUID.randomUUID().toString() when:
String msg = "test"
def msgId def msgId = runWithSpan("parent") {
runWithSpan("parent") { producer.send(msg)
msgId = producer.send(msg)
} }
def traces = waitForTraces(1) then:
Assert.assertEquals(traces.size(), 1) assertTraces(1) {
def spans = traces[0] trace(0, 2) {
Assert.assertEquals(spans.size(), 2) span(0) {
def parent = spans.find { name "parent"
it0 -> kind INTERNAL
it0.name.equalsIgnoreCase("parent") hasNoParent()
} }
def producer = spans.find { span(1) {
it0 -> name "PRODUCER/SEND"
it0.name.equalsIgnoreCase("PRODUCER/SEND") kind PRODUCER
} childOf span(0)
Assert.assertNotNull(parent) attributes {
Assert.assertNotNull(producer) "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
SpanAssert.assertSpan(parent) { "$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
name("parent") "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
kind(INTERNAL) "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
hasNoParent() "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
} "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
}
SpanAssert.assertSpan(producer) { }
name("PRODUCER/SEND")
kind(PRODUCER)
childOf parent
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
"$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
} }
} }
} }
def "test consume non-partitioned topic"() { def "test consume non-partitioned topic"() {
setup: setup:
def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() def topic = "persistent://public/default/testConsumeNonPartitionedTopic"
def latch = new CountDownLatch(1) def latch = new CountDownLatch(1)
admin.topics().createNonPartitionedTopic(topic) admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING) consumer = client.newConsumer(Schema.STRING)
@ -144,145 +138,113 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
.enableBatching(false) .enableBatching(false)
.create() .create()
def msgId when:
def msg = UUID.randomUUID().toString() def msg = "test"
runWithSpan("parent") { def msgId = runWithSpan("parent") {
msgId = producer.send(msg) producer.send(msg)
} }
latch.await(1, TimeUnit.MINUTES) latch.await(1, TimeUnit.MINUTES)
// Wait until all the spans finished.
Thread.sleep(TimeUnit.SECONDS.toMillis(20))
def traces = waitForTraces(1) then:
def spans = traces[0] assertTraces(1) {
Assert.assertEquals(spans.size(), 4) trace(0, 4) {
def parent = spans.find { span(0) {
it0 -> name "parent"
it0.name.equalsIgnoreCase("parent") kind INTERNAL
} hasNoParent()
def send = spans.find { }
it0 -> span(1) {
it0.name.equalsIgnoreCase("PRODUCER/SEND") name "PRODUCER/SEND"
} kind PRODUCER
def receive = spans.find { childOf span(0)
it0 -> attributes {
it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
} "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
def process = spans.find { "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
it0 -> "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
it0.name.equalsIgnoreCase("CONSUMER/PROCESS") "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
} "$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
}
SpanAssert.assertSpan(parent) { }
name("parent") span(2) {
kind(INTERNAL) name "CONSUMER/RECEIVE"
hasNoParent() kind CONSUMER
} childOf span(1)
attributes {
SpanAssert.assertSpan(send) { "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
name("PRODUCER/SEND") "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
kind(PRODUCER) "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
childOf parent "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
attributes { "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" "$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl }
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() span(3) {
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long name "CONSUMER/PROCESS"
"$SemanticAttributes.MESSAGE_TYPE" "NORMAL" kind INTERNAL
} childOf span(2)
} attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
SpanAssert.assertSpan(receive) { "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
name("CONSUMER/RECEIVE") "$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
kind(CONSUMER) "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
childOf(send) "$SemanticAttributes.MESSAGING_OPERATION" "process"
attributes { "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" }
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" }
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
}
}
SpanAssert.assertSpan(process) {
name("CONSUMER/PROCESS")
kind(INTERNAL)
childOf(receive)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" topic
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
} }
} }
} }
def "test send partitioned topic"() { def "test send partitioned topic"() {
setup: setup:
def topic = "persistent://public/default/testPartitionedTopic_" + UUID.randomUUID() def topic = "persistent://public/default/testSendPartitionedTopic"
admin.topics().createPartitionedTopic(topic, 2) admin.topics().createPartitionedTopic(topic, 2)
producer = producer =
client.newProducer(Schema.STRING).topic(topic) client.newProducer(Schema.STRING).topic(topic)
.enableBatching(false).create() .enableBatching(false).create()
String msg = UUID.randomUUID().toString() when:
String msg = "test"
def msgId def msgId = runWithSpan("parent") {
runWithSpan("parent") { producer.send(msg)
msgId = producer.send(msg)
} }
def traces = waitForTraces(1) then:
def spans = traces[0] assertTraces(1) {
Assert.assertEquals(spans.size(), 2) trace(0, 2) {
span(0) {
def parent = spans.find { name "parent"
it0 -> kind INTERNAL
it0.name.equalsIgnoreCase("parent") hasNoParent()
} }
def send = spans.find { span(1) {
it0 -> name "PRODUCER/SEND"
it0.name.equalsIgnoreCase("PRODUCER/SEND") kind PRODUCER
} childOf span(0)
attributes {
SpanAssert.assertSpan(parent) { "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
name("parent") "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
kind(INTERNAL) "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
hasNoParent() "$SemanticAttributes.MESSAGING_DESTINATION_NAME" {
} t ->
return t.toString().contains(topic)
SpanAssert.assertSpan(send) { }
name("PRODUCER/SEND") "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
kind(PRODUCER) "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
childOf parent "$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
attributes { }
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" {
t ->
return t.toString().contains(topic)
} }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
} }
} }
} }
def "test consume partitioned topic"() { def "test consume partitioned topic"() {
setup: setup:
def topic = "persistent://public/default/testPartitionedTopic_" + UUID.randomUUID() def topic = "persistent://public/default/testConsumePartitionedTopic"
admin.topics().createPartitionedTopic(topic, 2) admin.topics().createPartitionedTopic(topic, 2)
def latch = new CountDownLatch(1) def latch = new CountDownLatch(1)
@ -304,105 +266,73 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
.enableBatching(false) .enableBatching(false)
.create() .create()
def msgId when:
def msg = UUID.randomUUID().toString() def msg = "test"
runWithSpan("parent") { def msgId = runWithSpan("parent") {
msgId = producer.send(msg) producer.send(msg)
} }
latch.await(1, TimeUnit.MINUTES) latch.await(1, TimeUnit.MINUTES)
// Wait until all the spans finished.
Thread.sleep(TimeUnit.SECONDS.toMillis(20))
def traces = waitForTraces(1) then:
Assert.assertEquals(traces.size(), 1) assertTraces(1) {
def spans = traces[0] trace(0, 4) {
Assert.assertEquals(spans.size(), 4) span(0) {
name "parent"
def parent = spans.find { kind INTERNAL
it0 -> hasNoParent()
it0.name.equalsIgnoreCase("parent")
}
def send = spans.find {
it0 ->
it0.name.equalsIgnoreCase("PRODUCER/SEND")
}
def receive = spans.find {
it0 ->
it0.name.equalsIgnoreCase("CONSUMER/RECEIVE")
}
def process = spans.find {
it0 ->
it0.name.equalsIgnoreCase("CONSUMER/PROCESS")
}
SpanAssert.assertSpan(parent) {
name("parent")
kind(INTERNAL)
hasNoParent()
}
SpanAssert.assertSpan(send) {
name("PRODUCER/SEND")
kind(PRODUCER)
childOf parent
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" {
v ->
return v.toString().contains(topic)
} }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() span(1) {
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long name "PRODUCER/SEND"
"$SemanticAttributes.MESSAGE_TYPE" "NORMAL" kind PRODUCER
} childOf span(0)
} attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
SpanAssert.assertSpan(receive) { "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
name("CONSUMER/RECEIVE") "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
kind(CONSUMER) "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topic) }
childOf(send) "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
attributes { "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" "$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" }
"$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" {
v ->
return v.toString().contains(topic)
} }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString() span(2) {
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long name "CONSUMER/RECEIVE"
"$SemanticAttributes.MESSAGING_OPERATION" "receive" kind CONSUMER
} childOf span(1)
} attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
SpanAssert.assertSpan(process) { "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
name("CONSUMER/PROCESS") "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
kind(INTERNAL) "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topic) }
childOf(receive) "$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
attributes { "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar" "$SemanticAttributes.MESSAGING_OPERATION" "receive"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic" }
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" { }
v -> span(3) {
return v.toString().contains(topic) name "CONSUMER/PROCESS"
kind INTERNAL
childOf span(2)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topic) }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_OPERATION" "process"
}
} }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_OPERATION" "process"
} }
} }
} }
def "test consume multi-topics"() { def "test consume multi-topics"() {
setup: setup:
def topic = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() def topicNamePrefix = "persistent://public/default/testConsumeMulti_"
def topic1 = "persistent://public/default/testNonPartitionedTopic_" + UUID.randomUUID() def topic = topicNamePrefix + "1"
def topic1 = topicNamePrefix + "2"
def latch = new CountDownLatch(2) def latch = new CountDownLatch(2)
producer = client.newProducer(Schema.STRING) producer = client.newProducer(Schema.STRING)
@ -414,9 +344,12 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
.enableBatching(false) .enableBatching(false)
.create() .create()
runWithSpan("parent") { when:
producer.send(UUID.randomUUID().toString()) runWithSpan("parent1") {
producer1.send(UUID.randomUUID().toString()) producer.send("test1")
}
runWithSpan("parent2") {
producer1.send("test2")
} }
consumer = client.newConsumer(Schema.STRING) consumer = client.newConsumer(Schema.STRING)
@ -433,74 +366,74 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
.subscribe() .subscribe()
latch.await(1, TimeUnit.MINUTES) latch.await(1, TimeUnit.MINUTES)
// Wait until all the spans finished.
Thread.sleep(TimeUnit.SECONDS.toMillis(20))
def traces = waitForTraces(1) then:
Assert.assertEquals(traces.size(), 1) assertTraces(2) {
def spans = traces[0] traces.sort(orderByRootSpanName("parent1", "parent2"))
Assert.assertEquals(spans.size(), 7) for (int i in 1..2) {
trace(i - 1, 4) {
def parent = spans.find { span(0) {
it0 -> name "parent" + i
it0.name.equalsIgnoreCase("parent") kind INTERNAL
} hasNoParent()
}
SpanAssert.assertSpan(parent) { span(1) {
hasNoParent() name "PRODUCER/SEND"
kind(INTERNAL) kind PRODUCER
} childOf span(0)
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
def sendSpans = spans.findAll { "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
it0 -> "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
it0.name.equalsIgnoreCase("PRODUCER/SEND") "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) }
} "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
sendSpans.forEach { "$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
it0 -> }
SpanAssert.assertSpan(it0) { }
kind(PRODUCER) span(1) {
childOf(parent) name "PRODUCER/SEND"
} kind PRODUCER
} childOf span(0)
attributes {
def receiveSpans = spans.findAll { "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
it0 -> "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
it0.name.equalsIgnoreCase("CONSUMER/RECEIVE") "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
} "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
def processSpans = spans.findAll { "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
it0 -> "$SemanticAttributes.MESSAGE_TYPE" "NORMAL"
it0.name.equalsIgnoreCase("CONSUMER/PROCESS") }
} }
span(2) {
receiveSpans.forEach { name "CONSUMER/RECEIVE"
it0 -> kind CONSUMER
def parentSpanId = it0.getParentSpanId() childOf span(1)
def parent0 = sendSpans.find { attributes {
v -> "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
(v.spanId == parentSpanId) "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
} "$SemanticAttributes.NET_SOCK_PEER_ADDR" brokerUrl
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) }
SpanAssert.assertSpan(it0) { "$SemanticAttributes.MESSAGING_MESSAGE_ID" String
kind(CONSUMER) "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
childOf(parent0) "$SemanticAttributes.MESSAGING_OPERATION" "receive"
} }
} }
span(3) {
processSpans.forEach { name "CONSUMER/PROCESS"
it0 -> kind INTERNAL
def parentSpanId = it0.getParentSpanId() childOf span(2)
def parent0 = receiveSpans.find { attributes {
v -> "$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
(v.spanId == parentSpanId) "$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
} "$SemanticAttributes.MESSAGING_DESTINATION_NAME" { it.startsWith(topicNamePrefix) }
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
SpanAssert.assertSpan(it0) { "$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
kind(INTERNAL) "$SemanticAttributes.MESSAGING_OPERATION" "process"
childOf(parent0) }
}
} }
}
} }
} }
} }