Pulsar batch receive instrumentation (#8173)

This commit is contained in:
Lauri Tulmin 2023-04-04 20:19:08 +03:00 committed by GitHub
parent 4d21d45f3d
commit 34bca4ba10
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 440 additions and 67 deletions

View File

@ -22,5 +22,6 @@ dependencies {
tasks.withType<Test>().configureEach {
// TODO run tests both with and without experimental span attributes
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
}

View File

@ -7,6 +7,7 @@ package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.startAndEndConsumerReceive;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.wrap;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons.wrapBatch;
import static net.bytebuddy.matcher.ElementMatchers.isConstructor;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isProtected;
@ -25,6 +26,7 @@ import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.impl.PulsarClientImpl;
@ -41,7 +43,7 @@ public class ConsumerImplInstrumentation implements TypeInstrumentation {
public void transform(TypeTransformer transformer) {
String className = ConsumerImplInstrumentation.class.getName();
transformer.applyAdviceToMethod(isConstructor(), className + "$ConsumerConstructorAdviser");
transformer.applyAdviceToMethod(isConstructor(), className + "$ConsumerConstructorAdvice");
// internalReceive will apply to Consumer#receive(long,TimeUnit)
// and called before MessageListener#receive.
@ -51,22 +53,27 @@ public class ConsumerImplInstrumentation implements TypeInstrumentation {
.and(named("internalReceive"))
.and(takesArguments(2))
.and(takesArgument(1, named("java.util.concurrent.TimeUnit"))),
className + "$ConsumerInternalReceiveAdviser");
className + "$ConsumerInternalReceiveAdvice");
// internalReceive will apply to Consumer#receive()
transformer.applyAdviceToMethod(
isMethod().and(isProtected()).and(named("internalReceive")).and(takesArguments(0)),
className + "$ConsumerSyncReceiveAdviser");
className + "$ConsumerSyncReceiveAdvice");
// internalReceiveAsync will apply to Consumer#receiveAsync()
transformer.applyAdviceToMethod(
isMethod().and(isProtected()).and(named("internalReceiveAsync")).and(takesArguments(0)),
className + "$ConsumerAsyncReceiveAdviser");
// TODO batch receiving not implemented (Consumer#batchReceive() and
// Consumer#batchReceiveAsync())
className + "$ConsumerAsyncReceiveAdvice");
// internalBatchReceiveAsync will apply to Consumer#batchReceive() and
// Consumer#batchReceiveAsync()
transformer.applyAdviceToMethod(
isMethod()
.and(isProtected())
.and(named("internalBatchReceiveAsync"))
.and(takesArguments(0)),
className + "$ConsumerBatchAsyncReceiveAdvice");
}
@SuppressWarnings("unused")
public static class ConsumerConstructorAdviser {
private ConsumerConstructorAdviser() {}
public static class ConsumerConstructorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void after(
@ -79,8 +86,7 @@ public class ConsumerImplInstrumentation implements TypeInstrumentation {
}
@SuppressWarnings("unused")
public static class ConsumerInternalReceiveAdviser {
private ConsumerInternalReceiveAdviser() {}
public static class ConsumerInternalReceiveAdvice {
@Advice.OnMethodEnter
public static Timer before() {
@ -104,8 +110,7 @@ public class ConsumerImplInstrumentation implements TypeInstrumentation {
}
@SuppressWarnings("unused")
public static class ConsumerSyncReceiveAdviser {
private ConsumerSyncReceiveAdviser() {}
public static class ConsumerSyncReceiveAdvice {
@Advice.OnMethodEnter
public static Timer before() {
@ -125,8 +130,7 @@ public class ConsumerImplInstrumentation implements TypeInstrumentation {
}
@SuppressWarnings("unused")
public static class ConsumerAsyncReceiveAdviser {
private ConsumerAsyncReceiveAdviser() {}
public static class ConsumerAsyncReceiveAdvice {
@Advice.OnMethodEnter
public static Timer before() {
@ -141,4 +145,21 @@ public class ConsumerImplInstrumentation implements TypeInstrumentation {
future = wrap(future, timer, consumer);
}
}
@SuppressWarnings("unused")
public static class ConsumerBatchAsyncReceiveAdvice {
@Advice.OnMethodEnter
public static Timer before() {
return Timer.start();
}
@Advice.OnMethodExit(onThrowable = Throwable.class)
public static void after(
@Advice.Enter Timer timer,
@Advice.This Consumer<?> consumer,
@Advice.Return(readOnly = false) CompletableFuture<Messages<?>> future) {
future = wrapBatch(future, timer, consumer);
}
}
}

View File

@ -32,7 +32,6 @@ public class MessageInstrumentation implements TypeInstrumentation {
@SuppressWarnings("unused")
public static class MessageRecycleAdvice {
private MessageRecycleAdvice() {}
@Advice.OnMethodExit
public static void after(@Advice.This Message<?> message) {

View File

@ -38,12 +38,11 @@ public class MessageListenerInstrumentation implements TypeInstrumentation {
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod().and(isPublic()).and(named("getMessageListener")),
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdviser");
MessageListenerInstrumentation.class.getName() + "$ConsumerConfigurationDataMethodAdvice");
}
@SuppressWarnings("unused")
public static class ConsumerConfigurationDataMethodAdviser {
private ConsumerConfigurationDataMethodAdviser() {}
public static class ConsumerConfigurationDataMethodAdvice {
@Advice.OnMethodExit
public static void after(

View File

@ -44,18 +44,17 @@ public class ProducerImplInstrumentation implements TypeInstrumentation {
.and(isPublic())
.and(
takesArgument(0, hasSuperType(named("org.apache.pulsar.client.api.PulsarClient")))),
ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdviser");
ProducerImplInstrumentation.class.getName() + "$ProducerImplConstructorAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(named("sendAsync"))
.and(takesArgument(1, named("org.apache.pulsar.client.impl.SendCallback"))),
ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdviser");
ProducerImplInstrumentation.class.getName() + "$ProducerSendAsyncMethodAdvice");
}
@SuppressWarnings("unused")
public static class ProducerImplConstructorAdviser {
private ProducerImplConstructorAdviser() {}
public static class ProducerImplConstructorAdvice {
@Advice.OnMethodExit
public static void intercept(
@ -68,8 +67,7 @@ public class ProducerImplInstrumentation implements TypeInstrumentation {
}
@SuppressWarnings("unused")
public static class ProducerSendAsyncMethodAdviser {
private ProducerSendAsyncMethodAdviser() {}
public static class ProducerSendAsyncMethodAdvice {
@Advice.OnMethodEnter
public static void before(

View File

@ -0,0 +1,27 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;
public class BasePulsarRequest {
private final String destination;
private final UrlData urlData;
protected BasePulsarRequest(String destination, UrlData urlData) {
this.destination = destination;
this.urlData = urlData;
}
public String getDestination() {
return destination;
}
public UrlData getUrlData() {
return urlData;
}
}

View File

@ -0,0 +1,75 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
enum PulsarBatchMessagingAttributesGetter
implements MessagingAttributesGetter<PulsarBatchRequest, Void> {
INSTANCE;
@Override
public String getSystem(PulsarBatchRequest request) {
return "pulsar";
}
@Override
public String getDestinationKind(PulsarBatchRequest request) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Nullable
@Override
public String getDestination(PulsarBatchRequest request) {
return request.getDestination();
}
@Override
public boolean isTemporaryDestination(PulsarBatchRequest request) {
return false;
}
@Nullable
@Override
public String getConversationId(PulsarBatchRequest message) {
return null;
}
@Nullable
@Override
public Long getMessagePayloadSize(PulsarBatchRequest request) {
return StreamSupport.stream(request.getMessages().spliterator(), false)
.map(message -> (long) message.size())
.reduce(Long::sum)
.orElse(null);
}
@Nullable
@Override
public Long getMessagePayloadCompressedSize(PulsarBatchRequest request) {
return null;
}
@Nullable
@Override
public String getMessageId(PulsarBatchRequest request, @Nullable Void response) {
return null;
}
@Override
public List<String> getMessageHeader(PulsarBatchRequest request, String name) {
return StreamSupport.stream(request.getMessages().spliterator(), false)
.map(message -> message.getProperty(name))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;
import static io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.parseUrl;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
public final class PulsarBatchRequest extends BasePulsarRequest {
private final Messages<?> messages;
private PulsarBatchRequest(Messages<?> messages, String destination, UrlData urlData) {
super(destination, urlData);
this.messages = messages;
}
public static PulsarBatchRequest create(Messages<?> messages, String url) {
return new PulsarBatchRequest(messages, getTopicName(messages), parseUrl(url));
}
private static String getTopicName(Messages<?> messages) {
String topicName = null;
for (Message<?> message : messages) {
String name = message.getTopicName();
if (topicName == null) {
topicName = name;
} else if (!topicName.equals(name)) {
return null;
}
}
return topicName;
}
public Messages<?> getMessages() {
return messages;
}
}

View File

@ -0,0 +1,32 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
import org.apache.pulsar.client.api.Message;
final class PulsarBatchRequestSpanLinksExtractor implements SpanLinksExtractor<PulsarBatchRequest> {
private final SpanLinksExtractor<PulsarRequest> singleRecordLinkExtractor;
PulsarBatchRequestSpanLinksExtractor(TextMapPropagator propagator) {
this.singleRecordLinkExtractor =
new PropagatorBasedSpanLinksExtractor<>(propagator, MessageTextMapGetter.INSTANCE);
}
@Override
public void extract(
SpanLinksBuilder spanLinks, Context parentContext, PulsarBatchRequest request) {
for (Message<?> message : request.getMessages()) {
singleRecordLinkExtractor.extract(
spanLinks, Context.root(), PulsarRequest.create(message, request.getUrlData()));
}
}
}

View File

@ -9,22 +9,22 @@ import io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributes
import javax.annotation.Nullable;
public final class PulsarNetClientAttributesGetter
implements NetClientAttributesGetter<PulsarRequest, Void> {
implements NetClientAttributesGetter<BasePulsarRequest, Void> {
@Nullable
@Override
public String getTransport(PulsarRequest request, @Nullable Void unused) {
public String getTransport(BasePulsarRequest request, @Nullable Void unused) {
return null;
}
@Nullable
@Override
public String getPeerName(PulsarRequest request) {
public String getPeerName(BasePulsarRequest request) {
return request.getUrlData() != null ? request.getUrlData().getHost() : null;
}
@Nullable
@Override
public Integer getPeerPort(PulsarRequest request) {
public Integer getPeerPort(BasePulsarRequest request) {
return request.getUrlData() != null ? request.getUrlData().getPort() : null;
}
}

View File

@ -11,15 +11,12 @@ import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.ProducerData;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.UrlParser.UrlData;
import org.apache.pulsar.client.api.Message;
public final class PulsarRequest {
public final class PulsarRequest extends BasePulsarRequest {
private final Message<?> message;
private final String destination;
private final UrlData urlData;
private PulsarRequest(Message<?> message, String destination, UrlData urlData) {
super(destination, urlData);
this.message = message;
this.destination = destination;
this.urlData = urlData;
}
public static PulsarRequest create(Message<?> message) {
@ -30,6 +27,10 @@ public final class PulsarRequest {
return new PulsarRequest(message, message.getTopicName(), parseUrl(url));
}
public static PulsarRequest create(Message<?> message, UrlData urlData) {
return new PulsarRequest(message, message.getTopicName(), urlData);
}
public static PulsarRequest create(Message<?> message, ProducerData producerData) {
return new PulsarRequest(message, producerData.topic, parseUrl(producerData.url));
}
@ -37,12 +38,4 @@ public final class PulsarRequest {
public Message<?> getMessage() {
return message;
}
public String getDestination() {
return destination;
}
public UrlData getUrlData() {
return urlData;
}
}

View File

@ -13,6 +13,7 @@ import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesGetter;
@ -27,6 +28,7 @@ import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Messages;
public final class PulsarSingletons {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.pulsar-2.8";
@ -41,6 +43,8 @@ public final class PulsarSingletons {
createConsumerProcessInstrumenter();
private static final Instrumenter<PulsarRequest, Void> CONSUMER_RECEIVE_INSTRUMENTER =
createConsumerReceiveInstrumenter();
private static final Instrumenter<PulsarBatchRequest, Void> CONSUMER_BATCH_RECEIVE_INSTRUMENTER =
createConsumerBatchReceiveInstrumenter();
private static final Instrumenter<PulsarRequest, Void> PRODUCER_INSTRUMENTER =
createProducerInstrumenter();
@ -64,12 +68,32 @@ public final class PulsarSingletons {
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(createMessagingAttributesExtractor(MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
NetClientAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.buildConsumerInstrumenter(MessageTextMapGetter.INSTANCE);
}
private static Instrumenter<PulsarBatchRequest, Void> createConsumerBatchReceiveInstrumenter() {
MessagingAttributesGetter<PulsarBatchRequest, Void> getter =
PulsarBatchMessagingAttributesGetter.INSTANCE;
return Instrumenter.<PulsarBatchRequest, Void>builder(
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.RECEIVE))
.addAttributesExtractor(
NetClientAttributesExtractor.create(new PulsarNetClientAttributesGetter()))
.setEnabled(ExperimentalConfig.get().messagingReceiveInstrumentationEnabled())
.addSpanLinksExtractor(
new PulsarBatchRequestSpanLinksExtractor(
GlobalOpenTelemetry.getPropagators().getTextMapPropagator()))
.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
}
private static Instrumenter<PulsarRequest, Void> createConsumerProcessInstrumenter() {
MessagingAttributesGetter<PulsarRequest, Void> getter =
PulsarMessagingAttributesGetter.INSTANCE;
@ -78,7 +102,8 @@ public final class PulsarSingletons {
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.PROCESS))
.addAttributesExtractor(createMessagingAttributesExtractor(MessageOperation.PROCESS))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.PROCESS))
.buildInstrumenter();
}
@ -91,7 +116,8 @@ public final class PulsarSingletons {
TELEMETRY,
INSTRUMENTATION_NAME,
MessagingSpanNameExtractor.create(getter, MessageOperation.SEND))
.addAttributesExtractor(createMessagingAttributesExtractor(MessageOperation.SEND))
.addAttributesExtractor(
createMessagingAttributesExtractor(getter, MessageOperation.SEND))
.addAttributesExtractor(
NetClientAttributesExtractor.create(new PulsarNetClientAttributesGetter()));
@ -103,9 +129,9 @@ public final class PulsarSingletons {
return builder.buildProducerInstrumenter(MessageTextMapSetter.INSTANCE);
}
private static AttributesExtractor<PulsarRequest, Void> createMessagingAttributesExtractor(
MessageOperation operation) {
return MessagingAttributesExtractor.builder(PulsarMessagingAttributesGetter.INSTANCE, operation)
private static <T> AttributesExtractor<T, Void> createMessagingAttributesExtractor(
MessagingAttributesGetter<T, Void> getter, MessageOperation operation) {
return MessagingAttributesExtractor.builder(getter, operation)
.setCapturedHeaders(capturedHeaders)
.build();
}
@ -133,6 +159,30 @@ public final class PulsarSingletons {
timer.now());
}
private static Context startAndEndConsumerReceive(
Context parent,
Messages<?> messages,
Timer timer,
Consumer<?> consumer,
Throwable throwable) {
if (messages == null) {
return null;
}
String brokerUrl = VirtualFieldStore.extract(consumer);
PulsarBatchRequest request = PulsarBatchRequest.create(messages, brokerUrl);
if (!CONSUMER_BATCH_RECEIVE_INSTRUMENTER.shouldStart(parent, request)) {
return null;
}
return InstrumenterUtil.startAndEnd(
CONSUMER_BATCH_RECEIVE_INSTRUMENTER,
parent,
request,
null,
throwable,
timer.startTime(),
timer.now());
}
public static CompletableFuture<Message<?>> wrap(
CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
Context parent = Context.current();
@ -154,6 +204,28 @@ public final class PulsarSingletons {
return result;
}
public static CompletableFuture<Messages<?>> wrapBatch(
CompletableFuture<Messages<?>> future, Timer timer, Consumer<?> consumer) {
Context parent = Context.current();
CompletableFuture<Messages<?>> result = new CompletableFuture<>();
future.whenComplete(
(messages, throwable) -> {
Context context =
startAndEndConsumerReceive(parent, messages, timer, consumer, throwable);
runWithContext(
context,
() -> {
if (throwable != null) {
result.completeExceptionally(throwable);
} else {
result.complete(messages);
}
});
});
return result;
}
private static void runWithContext(Context context, Runnable runnable) {
if (context != null) {
try (Scope ignored = context.makeCurrent()) {

View File

@ -12,6 +12,7 @@ import org.apache.pulsar.client.admin.PulsarAdmin
import org.apache.pulsar.client.api.Consumer
import org.apache.pulsar.client.api.Message
import org.apache.pulsar.client.api.MessageListener
import org.apache.pulsar.client.api.Messages
import org.apache.pulsar.client.api.Producer
import org.apache.pulsar.client.api.PulsarClient
import org.apache.pulsar.client.api.Schema
@ -24,6 +25,7 @@ import org.testcontainers.utility.DockerImageName
import spock.lang.Shared
import java.time.Duration
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
@ -194,7 +196,6 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
def "test consume non-partitioned topic using receiveAsync"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallReceiveAsync"
def latch = new CountDownLatch(1)
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
@ -208,10 +209,9 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
.create()
when:
consumer.receiveAsync().whenComplete { receivedMsg, throwable ->
CompletableFuture<Message<String>> result = consumer.receiveAsync().whenComplete { receivedMsg, throwable ->
runWithSpan("callback") {
consumer.acknowledge(receivedMsg)
latch.countDown()
}
}
@ -220,7 +220,7 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
producer.send(msg)
}
latch.await(1, TimeUnit.MINUTES)
result.get(1, TimeUnit.MINUTES)
then:
assertTraces(1) {
@ -281,6 +281,115 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
}
}
def "test consume non-partitioned topic using batchReceive"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceive"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
runWithSpan("receive-parent") {
def receivedMsg = consumer.batchReceive()
consumer.acknowledge(receivedMsg)
}
then:
def producer
assertTraces(2) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
producer = span(1)
}
trace(1, 2) {
span(0) {
name "receive-parent"
kind INTERNAL
hasNoParent()
}
receiveSpan(it, 1, span(0), topic, null, producer)
}
}
}
def "test consume non-partitioned topic using batchReceiveAsync"() {
setup:
def topic = "persistent://public/default/testConsumeNonPartitionedTopicCallBatchReceiveAsync"
admin.topics().createNonPartitionedTopic(topic)
consumer = client.newConsumer(Schema.STRING)
.subscriptionName("test_sub")
.topic(topic)
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe()
producer = client.newProducer(Schema.STRING)
.topic(topic)
.enableBatching(false)
.create()
when:
def msg = "test"
def msgId = runWithSpan("parent") {
producer.send(msg)
}
CompletableFuture<Messages<String>> result = runWithSpan("receive-parent") {
consumer.batchReceiveAsync().whenComplete { receivedMsg, throwable ->
runWithSpan("callback") {
consumer.acknowledge(receivedMsg)
}
}
}
result.get(1, TimeUnit.MINUTES).size() == 1
then:
def producer
assertTraces(2) {
trace(0, 2) {
span(0) {
name "parent"
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId)
producer = span(1)
}
trace(1, 3) {
span(0) {
name "receive-parent"
kind INTERNAL
hasNoParent()
}
receiveSpan(it, 1, span(0), topic, null, producer)
span(2) {
name "callback"
kind INTERNAL
childOf span(1)
attributes {
}
}
}
}
}
def "capture message header as span attribute"() {
setup:
def topic = "persistent://public/default/testCaptureMessageHeaderTopic"
@ -321,7 +430,7 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
hasNoParent()
}
producerSpan(it, 1, span(0), topic, msgId, true)
receiveSpan(it, 2, span(1), topic, msgId, true)
receiveSpan(it, 2, span(1), topic, msgId, null, true)
processSpan(it, 3, span(2), topic, msgId, true)
}
}
@ -452,9 +561,9 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
kind INTERNAL
hasNoParent()
}
producerSpan(it, 1, span(0), topic, null, { it.startsWith(topicNamePrefix) }, null)
receiveSpan(it, 2, span(1), topic, null, { it.startsWith(topicNamePrefix) }, null)
processSpan(it, 3, span(2), topic, null, { it.startsWith(topicNamePrefix) }, null)
producerSpan(it, 1, span(0), topic, null, { it.startsWith(topicNamePrefix) }, String)
receiveSpan(it, 2, span(1), topic, null, { it.startsWith(topicNamePrefix) }, String)
processSpan(it, 3, span(2), topic, null, { it.startsWith(topicNamePrefix) }, String)
}
}
}
@ -479,10 +588,10 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PEER_PORT" brokerPort
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
} else {
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
}
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"messaging.pulsar.message.type" "normal"
@ -493,11 +602,11 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
}
}
def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, boolean headers = false) {
receiveSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, headers)
def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Object msgId, Object linkedSpan = null, boolean headers = false) {
receiveSpan(trace, index, parentSpan, topic, null, { it == topic }, msgId, linkedSpan, headers)
}
def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, boolean headers = false) {
def receiveSpan(TraceAssert trace, int index, Object parentSpan, String topic, Pattern namePattern, Closure destination, Object msgId, Object linkedSpan = null, boolean headers = false) {
trace.span(index) {
if (namePattern != null) {
name namePattern
@ -506,16 +615,21 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
}
kind CONSUMER
childOf parentSpan
if (linkedSpan == null) {
hasNoLinks()
} else {
hasLink linkedSpan
}
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.NET_PEER_NAME" brokerHost
"$SemanticAttributes.NET_PEER_PORT" brokerPort
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
} else {
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
}
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_OPERATION" "receive"
@ -543,10 +657,10 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.MESSAGING_SYSTEM" "pulsar"
"$SemanticAttributes.MESSAGING_DESTINATION_KIND" "topic"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
} else {
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" msgId.toString()
}
"$SemanticAttributes.MESSAGING_OPERATION" "process"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long