[Pulsar] Support Pulsar Client send message with transaction. (#12731)

This commit is contained in:
道君 2024-11-19 23:31:05 +08:00 committed by GitHub
parent e27a76afca
commit e2742051f3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 99 additions and 3 deletions

View File

@ -25,6 +25,7 @@ public class PulsarInstrumentationModule extends InstrumentationModule {
new ProducerImplInstrumentation(),
new MessageInstrumentation(),
new MessageListenerInstrumentation(),
new SendCallbackInstrumentation());
new SendCallbackInstrumentation(),
new TransactionImplInstrumentation());
}
}

View File

@ -0,0 +1,45 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.pulsar.v2_8;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.telemetry.PulsarSingletons;
import java.util.concurrent.CompletableFuture;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
public class TransactionImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.pulsar.client.impl.transaction.TransactionImpl");
}
@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
named("registerProducedTopic")
.and(isPublic())
.and(takesArguments(1))
.and(takesArgument(0, String.class)),
TransactionImplInstrumentation.class.getName() + "$RegisterProducedTopicAdvice");
}
@SuppressWarnings("unused")
public static class RegisterProducedTopicAdvice {
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void after(@Advice.Return(readOnly = false) CompletableFuture<Void> future) {
future = PulsarSingletons.wrap(future);
}
}
}

View File

@ -210,6 +210,24 @@ public final class PulsarSingletons {
timer.now());
}
public static CompletableFuture<Void> wrap(CompletableFuture<Void> future) {
Context parent = Context.current();
CompletableFuture<Void> result = new CompletableFuture<>();
future.whenComplete(
(unused, t) ->
runWithContext(
parent,
() -> {
if (t != null) {
result.completeExceptionally(t);
} else {
result.complete(null);
}
}));
return result;
}
public static CompletableFuture<Message<?>> wrap(
CompletableFuture<Message<?>> future, Timer timer, Consumer<?> consumer) {
boolean listenerContextActive = MessageListenerContext.isProcessing();

View File

@ -89,12 +89,17 @@ abstract class AbstractPulsarClientTest {
new PulsarContainer(DEFAULT_IMAGE_NAME)
.withEnv("PULSAR_MEM", "-Xmx128m")
.withLogConsumer(new Slf4jLogConsumer(logger))
.withStartupTimeout(Duration.ofMinutes(2));
.withStartupTimeout(Duration.ofMinutes(2))
.withTransactions();
pulsar.start();
brokerHost = pulsar.getHost();
brokerPort = pulsar.getMappedPort(6650);
client = PulsarClient.builder().serviceUrl(pulsar.getPulsarBrokerUrl()).build();
client =
PulsarClient.builder()
.serviceUrl(pulsar.getPulsarBrokerUrl())
.enableTransaction(true)
.build();
admin = PulsarAdmin.builder().serviceHttpUrl(pulsar.getHttpServiceUrl()).build();
}

View File

@ -27,6 +27,7 @@ import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.junit.jupiter.api.Test;
class PulsarClientTest extends AbstractPulsarClientTest {
@ -671,4 +672,30 @@ class PulsarClientTest extends AbstractPulsarClientTest {
});
}));
}
@Test
void testSendMessageWithTxn() throws Exception {
String topic = "persistent://public/default/testSendMessageWithTxn";
admin.topics().createNonPartitionedTopic(topic);
producer =
client
.newProducer(Schema.STRING)
.topic(topic)
.sendTimeout(0, TimeUnit.SECONDS)
.enableBatching(false)
.create();
Transaction transaction =
client.newTransaction().withTransactionTimeout(15, TimeUnit.SECONDS).build().get();
testing.runWithSpan("parent1", () -> producer.newMessage(transaction).value("test1").send());
transaction.commit();
testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent1").hasKind(SpanKind.INTERNAL).hasNoParent(),
span ->
span.hasName(topic + " publish")
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))));
}
}