Add support for spring-pulsar 1.0 (#13320)
Co-authored-by: Lauri Tulmin <ltulmin@splunk.com>
This commit is contained in:
parent
df111a4381
commit
4ebaf70c81
|
@ -895,6 +895,9 @@ targets:
|
|||
- type: gradle
|
||||
path: ./
|
||||
target: ':instrumentation:spring:spring-kafka-2.7:library'
|
||||
- type: gradle
|
||||
path: ./
|
||||
target: ':instrumentation:spring:spring-pulsar-1.0:javaagent'
|
||||
- type: gradle
|
||||
path: ./
|
||||
target: ':instrumentation:spring:spring-rabbit-1.0:javaagent'
|
||||
|
|
|
@ -138,6 +138,7 @@ These are the supported libraries and frameworks:
|
|||
| [Spring Integration](https://spring.io/projects/spring-integration) | 4.1+ (not including 6.0+ yet) | [opentelemetry-spring-integration-4.1](../instrumentation/spring/spring-integration-4.1/library) | [Messaging Spans] |
|
||||
| [Spring JMS](https://docs.spring.io/spring-framework/docs/current/reference/html/integration.html#jms) | 2.0+ | N/A | [Messaging Spans] |
|
||||
| [Spring Kafka](https://spring.io/projects/spring-kafka) | 2.7+ | [opentelemetry-spring-kafka-2.7](../instrumentation/spring/spring-kafka-2.7/library) | [Messaging Spans] |
|
||||
| [Spring Pulsar](https://spring.io/projects/spring-pulsar) | 1.0+ | | [Messaging Spans] |
|
||||
| [Spring RabbitMQ](https://spring.io/projects/spring-amqp) | 1.0+ | N/A | [Messaging Spans] |
|
||||
| [Spring RestTemplate](https://docs.spring.io/spring-framework/docs/current/javadoc-api/org/springframework/web/client/package-summary.html) | 3.1+ | [opentelemetry-spring-web-3.1](../instrumentation/spring/spring-web/spring-web-3.1/library) | [HTTP Client Spans], [HTTP Client Metrics] |
|
||||
| [Spring RMI](https://docs.spring.io/spring-framework/docs/4.0.x/javadoc-api/org/springframework/remoting/rmi/package-summary.html) | 4.0+ | N/A | [RPC Client Spans], [RPC Server Spans] |
|
||||
|
|
|
@ -263,6 +263,8 @@ public final class PulsarSingletons {
|
|||
(messages, throwable) -> {
|
||||
Context context =
|
||||
startAndEndConsumerReceive(parent, messages, timer, consumer, throwable);
|
||||
// injected context is used in the spring-pulsar instrumentation
|
||||
messages.forEach(message -> VirtualFieldStore.inject(message, context));
|
||||
runWithContext(
|
||||
context,
|
||||
() -> {
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
plugins {
|
||||
id("otel.javaagent-instrumentation")
|
||||
}
|
||||
|
||||
muzzle {
|
||||
pass {
|
||||
group.set("org.springframework.pulsar")
|
||||
module.set("spring-pulsar")
|
||||
versions.set("[1.0.0,)")
|
||||
assertInverse.set(true)
|
||||
excludeInstrumentationName("pulsar-2.8")
|
||||
}
|
||||
}
|
||||
|
||||
dependencies {
|
||||
library("org.springframework.pulsar:spring-pulsar:1.0.0")
|
||||
implementation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
|
||||
|
||||
testInstrumentation(project(":instrumentation:pulsar:pulsar-2.8:javaagent"))
|
||||
|
||||
testImplementation(project(":instrumentation:spring:spring-pulsar-1.0:testing"))
|
||||
|
||||
testLibrary("org.springframework.boot:spring-boot-starter-test:3.2.4")
|
||||
testLibrary("org.springframework.boot:spring-boot-starter:3.2.4")
|
||||
}
|
||||
|
||||
val latestDepTest = findProperty("testLatestDeps") as Boolean
|
||||
|
||||
testing {
|
||||
suites {
|
||||
val testReceiveSpansDisabled by registering(JvmTestSuite::class) {
|
||||
dependencies {
|
||||
implementation(project(":instrumentation:spring:spring-pulsar-1.0:testing"))
|
||||
|
||||
if (latestDepTest) {
|
||||
implementation("org.springframework.pulsar:spring-pulsar:latest.release")
|
||||
implementation("org.springframework.boot:spring-boot-starter-test:latest.release")
|
||||
implementation("org.springframework.boot:spring-boot-starter:latest.release")
|
||||
} else {
|
||||
implementation("org.springframework.pulsar:spring-pulsar:1.0.0")
|
||||
implementation("org.springframework.boot:spring-boot-starter-test:3.2.4")
|
||||
implementation("org.springframework.boot:spring-boot-starter:3.2.4")
|
||||
}
|
||||
}
|
||||
|
||||
targets {
|
||||
all {
|
||||
testTask.configure {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||
|
||||
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=false")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks {
|
||||
test {
|
||||
usesService(gradle.sharedServices.registrations["testcontainersBuildService"].service)
|
||||
|
||||
jvmArgs("-Dotel.instrumentation.pulsar.experimental-span-attributes=true")
|
||||
jvmArgs("-Dotel.instrumentation.messaging.experimental.receive-telemetry.enabled=true")
|
||||
}
|
||||
|
||||
check {
|
||||
dependsOn(testing.suites)
|
||||
}
|
||||
}
|
||||
|
||||
// spring 6 requires java 17
|
||||
otelJava {
|
||||
minJavaVersionSupported.set(JavaVersion.VERSION_17)
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0.SpringPulsarSingletons.instrumenter;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.named;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
||||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.pulsar.v2_8.VirtualFieldStore;
|
||||
import net.bytebuddy.asm.Advice;
|
||||
import net.bytebuddy.description.type.TypeDescription;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
|
||||
public class DefaultPulsarMessageListenerContainerInstrumentation implements TypeInstrumentation {
|
||||
@Override
|
||||
public ElementMatcher<TypeDescription> typeMatcher() {
|
||||
return named(
|
||||
"org.springframework.pulsar.listener.DefaultPulsarMessageListenerContainer$Listener");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void transform(TypeTransformer transformer) {
|
||||
transformer.applyAdviceToMethod(
|
||||
named("dispatchMessageToListener")
|
||||
.and(takesArguments(3).or(takesArguments(2)))
|
||||
.and(takesArgument(0, named("org.apache.pulsar.client.api.Message"))),
|
||||
getClass().getName() + "$DispatchMessageToListenerAdvice");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
public static class DispatchMessageToListenerAdvice {
|
||||
@Advice.OnMethodEnter(suppress = Throwable.class)
|
||||
public static void onEnter(
|
||||
@Advice.Argument(0) Message<?> message,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope) {
|
||||
Context parentContext = VirtualFieldStore.extract(message);
|
||||
if (instrumenter().shouldStart(parentContext, message)) {
|
||||
context = instrumenter().start(parentContext, message);
|
||||
scope = context.makeCurrent();
|
||||
}
|
||||
}
|
||||
|
||||
@Advice.OnMethodExit(suppress = Throwable.class, onThrowable = Throwable.class)
|
||||
public static void onExit(
|
||||
@Advice.Argument(0) Message<?> message,
|
||||
@Advice.Local("otelContext") Context context,
|
||||
@Advice.Local("otelScope") Scope scope,
|
||||
@Advice.Thrown Throwable throwable) {
|
||||
if (scope == null) {
|
||||
return;
|
||||
}
|
||||
scope.close();
|
||||
instrumenter().end(context, message, null, throwable);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,25 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import io.opentelemetry.context.propagation.TextMapGetter;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
|
||||
enum MessageHeaderGetter implements TextMapGetter<Message<?>> {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public Iterable<String> keys(Message<?> carrier) {
|
||||
return carrier.getProperties().keySet();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String get(@Nullable Message<?> carrier, String key) {
|
||||
return carrier == null ? null : carrier.getProperties().get(key);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,34 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import static io.opentelemetry.javaagent.extension.matcher.AgentElementMatchers.hasClassesNamed;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
import com.google.auto.service.AutoService;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import java.util.List;
|
||||
import net.bytebuddy.matcher.ElementMatcher;
|
||||
|
||||
@AutoService(InstrumentationModule.class)
|
||||
public class SpringPulsarInstrumentationModule extends InstrumentationModule {
|
||||
public SpringPulsarInstrumentationModule() {
|
||||
super("spring-pulsar", "spring-pulsar-1.0");
|
||||
}
|
||||
|
||||
@Override
|
||||
public ElementMatcher.Junction<ClassLoader> classLoaderMatcher() {
|
||||
// added in 1.0.0
|
||||
return hasClassesNamed(
|
||||
"org.springframework.pulsar.annotation.PulsarListenerConsumerBuilderCustomizer");
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TypeInstrumentation> typeInstrumentations() {
|
||||
return singletonList(new DefaultPulsarMessageListenerContainerInstrumentation());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import static java.util.Collections.emptyList;
|
||||
import static java.util.Collections.singletonList;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesGetter;
|
||||
import java.util.List;
|
||||
import javax.annotation.Nullable;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
|
||||
enum SpringPulsarMessageAttributesGetter implements MessagingAttributesGetter<Message<?>, Void> {
|
||||
INSTANCE;
|
||||
|
||||
@Override
|
||||
public String getSystem(Message<?> message) {
|
||||
return "pulsar";
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getDestination(Message<?> message) {
|
||||
return message.getTopicName();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getDestinationTemplate(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTemporaryDestination(Message<?> message) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isAnonymousDestination(Message<?> message) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getConversationId(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Long getMessageBodySize(Message<?> message) {
|
||||
return (long) message.size();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Long getMessageEnvelopeSize(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
@Nullable
|
||||
public String getMessageId(Message<?> message, @Nullable Void unused) {
|
||||
if (message.getMessageId() != null) {
|
||||
return message.getMessageId().toString();
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public String getClientId(Message<?> message) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
public Long getBatchMessageCount(Message<?> message, @Nullable Void unused) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<String> getMessageHeader(Message<?> message, String name) {
|
||||
String value = message.getProperty(name);
|
||||
return value != null ? singletonList(value) : emptyList();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessageOperation;
|
||||
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.incubator.semconv.messaging.MessagingSpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
import io.opentelemetry.instrumentation.api.internal.PropagatorBasedSpanLinksExtractor;
|
||||
import io.opentelemetry.javaagent.bootstrap.internal.ExperimentalConfig;
|
||||
import org.apache.pulsar.client.api.Message;
|
||||
|
||||
public final class SpringPulsarSingletons {
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.spring-pulsar-1.0";
|
||||
private static final Instrumenter<Message<?>, Void> INSTRUMENTER;
|
||||
|
||||
static {
|
||||
OpenTelemetry openTelemetry = GlobalOpenTelemetry.get();
|
||||
SpringPulsarMessageAttributesGetter getter = SpringPulsarMessageAttributesGetter.INSTANCE;
|
||||
MessageOperation operation = MessageOperation.PROCESS;
|
||||
boolean messagingReceiveInstrumentationEnabled =
|
||||
ExperimentalConfig.get().messagingReceiveInstrumentationEnabled();
|
||||
|
||||
InstrumenterBuilder<Message<?>, Void> builder =
|
||||
Instrumenter.<Message<?>, Void>builder(
|
||||
openTelemetry,
|
||||
INSTRUMENTATION_NAME,
|
||||
MessagingSpanNameExtractor.create(getter, operation))
|
||||
.addAttributesExtractor(
|
||||
MessagingAttributesExtractor.builder(getter, operation)
|
||||
.setCapturedHeaders(ExperimentalConfig.get().getMessagingHeaders())
|
||||
.build());
|
||||
if (messagingReceiveInstrumentationEnabled) {
|
||||
builder.addSpanLinksExtractor(
|
||||
new PropagatorBasedSpanLinksExtractor<>(
|
||||
openTelemetry.getPropagators().getTextMapPropagator(), MessageHeaderGetter.INSTANCE));
|
||||
INSTRUMENTER = builder.buildInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
} else {
|
||||
INSTRUMENTER = builder.buildConsumerInstrumenter(MessageHeaderGetter.INSTANCE);
|
||||
}
|
||||
}
|
||||
|
||||
public static Instrumenter<Message<?>, Void> instrumenter() {
|
||||
return INSTRUMENTER;
|
||||
}
|
||||
|
||||
private SpringPulsarSingletons() {}
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
|
||||
import static io.opentelemetry.api.trace.SpanKind.INTERNAL;
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
|
||||
import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanKind;
|
||||
|
||||
import io.opentelemetry.instrumentation.spring.pulsar.v1_0.AbstractSpringPulsarTest;
|
||||
import io.opentelemetry.sdk.trace.data.LinkData;
|
||||
import io.opentelemetry.sdk.trace.data.SpanData;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
class SpringPulsarTest extends AbstractSpringPulsarTest {
|
||||
|
||||
@Override
|
||||
protected void assertSpringPulsar() {
|
||||
AtomicReference<SpanData> producer = new AtomicReference<>();
|
||||
|
||||
testing.waitAndAssertSortedTraces(
|
||||
orderByRootSpanKind(INTERNAL, CONSUMER),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasNoParent(),
|
||||
span -> {
|
||||
span.hasName(OTEL_TOPIC + " publish")
|
||||
.hasKind(PRODUCER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfyingExactly(publishAttributes());
|
||||
|
||||
producer.set(trace.getSpan(1));
|
||||
}),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span ->
|
||||
span.hasName(String.format("%s receive", OTEL_TOPIC))
|
||||
.hasKind(CONSUMER)
|
||||
.hasNoParent()
|
||||
.hasAttributesSatisfyingExactly(receiveAttributes()),
|
||||
span ->
|
||||
span.hasName(String.format("%s process", OTEL_TOPIC))
|
||||
.hasKind(CONSUMER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasLinks(LinkData.create(producer.get().getSpanContext()))
|
||||
.hasAttributesSatisfyingExactly(processAttributes()),
|
||||
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,37 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.javaagent.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
|
||||
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
|
||||
|
||||
import io.opentelemetry.instrumentation.spring.pulsar.v1_0.AbstractSpringPulsarTest;
|
||||
|
||||
class SpringPulsarSuppressReceiveSpansTest extends AbstractSpringPulsarTest {
|
||||
|
||||
@Override
|
||||
protected void assertSpringPulsar() {
|
||||
testing.waitAndAssertTraces(
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName("parent").hasNoParent(),
|
||||
span ->
|
||||
span.hasName(OTEL_TOPIC + " publish")
|
||||
.hasKind(PRODUCER)
|
||||
.hasParent(trace.getSpan(0))
|
||||
.hasAttributesSatisfyingExactly(publishAttributes()),
|
||||
span ->
|
||||
span.hasName(String.format("%s process", OTEL_TOPIC))
|
||||
.hasKind(CONSUMER)
|
||||
.hasParent(trace.getSpan(1))
|
||||
.hasTotalRecordedLinks(0)
|
||||
.hasAttributesSatisfyingExactly(processAttributes()),
|
||||
span -> span.hasName("consumer").hasParent(trace.getSpan(2))),
|
||||
trace ->
|
||||
trace.hasSpansSatisfyingExactly(
|
||||
span -> span.hasName(String.format("%s receive", OTEL_TOPIC)).hasKind(CONSUMER)));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,12 @@
|
|||
plugins {
|
||||
id("otel.java-conventions")
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation(project(":testing-common"))
|
||||
implementation("org.testcontainers:pulsar")
|
||||
|
||||
compileOnly("org.springframework.pulsar:spring-pulsar:1.0.0")
|
||||
compileOnly("org.springframework.boot:spring-boot-starter-test:3.2.4")
|
||||
compileOnly("org.springframework.boot:spring-boot-starter:3.2.4")
|
||||
}
|
|
@ -0,0 +1,156 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.spring.pulsar.v1_0;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.equalTo;
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.satisfies;
|
||||
import static io.opentelemetry.semconv.ServerAttributes.SERVER_ADDRESS;
|
||||
import static io.opentelemetry.semconv.ServerAttributes.SERVER_PORT;
|
||||
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_BATCH_MESSAGE_COUNT;
|
||||
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME;
|
||||
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE;
|
||||
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_MESSAGE_ID;
|
||||
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_OPERATION;
|
||||
import static io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes.MESSAGING_SYSTEM;
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
import io.opentelemetry.instrumentation.testing.GlobalTraceUtil;
|
||||
import io.opentelemetry.instrumentation.testing.junit.AgentInstrumentationExtension;
|
||||
import io.opentelemetry.instrumentation.testing.junit.InstrumentationExtension;
|
||||
import io.opentelemetry.sdk.testing.assertj.AttributeAssertion;
|
||||
import java.time.Duration;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import org.apache.pulsar.client.api.PulsarClient;
|
||||
import org.apache.pulsar.client.api.PulsarClientException;
|
||||
import org.assertj.core.api.AbstractLongAssert;
|
||||
import org.assertj.core.api.AbstractStringAssert;
|
||||
import org.junit.jupiter.api.AfterAll;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.extension.RegisterExtension;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.SpringBootConfiguration;
|
||||
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
|
||||
import org.springframework.context.ConfigurableApplicationContext;
|
||||
import org.springframework.pulsar.annotation.PulsarListener;
|
||||
import org.springframework.pulsar.core.PulsarTemplate;
|
||||
import org.testcontainers.containers.PulsarContainer;
|
||||
import org.testcontainers.utility.DockerImageName;
|
||||
|
||||
@SuppressWarnings("deprecation") // using deprecated semconv
|
||||
public abstract class AbstractSpringPulsarTest {
|
||||
|
||||
@RegisterExtension
|
||||
protected static final InstrumentationExtension testing = AgentInstrumentationExtension.create();
|
||||
|
||||
static final DockerImageName DEFAULT_IMAGE_NAME =
|
||||
DockerImageName.parse("apachepulsar/pulsar:4.0.2");
|
||||
static PulsarContainer pulsarContainer;
|
||||
static ConfigurableApplicationContext applicationContext;
|
||||
static PulsarTemplate<String> pulsarTemplate;
|
||||
static PulsarClient client;
|
||||
static CountDownLatch latch = new CountDownLatch(1);
|
||||
static final String OTEL_SUBSCRIPTION = "otel-subscription";
|
||||
protected static String brokerHost;
|
||||
protected static int brokerPort;
|
||||
protected static final String OTEL_TOPIC = "persistent://public/default/otel-topic";
|
||||
|
||||
@BeforeAll
|
||||
@SuppressWarnings("unchecked")
|
||||
static void setUp() throws PulsarClientException {
|
||||
pulsarContainer =
|
||||
new PulsarContainer(DEFAULT_IMAGE_NAME)
|
||||
.withEnv("PULSAR_MEM", "-Xmx128m")
|
||||
.withStartupTimeout(Duration.ofMinutes(2));
|
||||
pulsarContainer.start();
|
||||
brokerHost = pulsarContainer.getHost();
|
||||
brokerPort = pulsarContainer.getMappedPort(6650);
|
||||
|
||||
SpringApplication app = new SpringApplication(ConsumerConfig.class);
|
||||
Map<String, Object> props = new HashMap<>();
|
||||
props.put("spring.main.web-application-type", "none");
|
||||
props.put("spring.pulsar.client.service-url", pulsarContainer.getPulsarBrokerUrl());
|
||||
props.put("spring.pulsar.consumer.subscription.initial-position", "earliest");
|
||||
app.setDefaultProperties(props);
|
||||
applicationContext = app.run();
|
||||
pulsarTemplate = applicationContext.getBean(PulsarTemplate.class);
|
||||
|
||||
client = PulsarClient.builder().serviceUrl(pulsarContainer.getPulsarBrokerUrl()).build();
|
||||
}
|
||||
|
||||
@Test
|
||||
void testSpringPulsar() throws PulsarClientException, InterruptedException {
|
||||
testing.runWithSpan(
|
||||
"parent",
|
||||
() -> {
|
||||
pulsarTemplate.send(OTEL_TOPIC, "test");
|
||||
});
|
||||
latch.await(10, TimeUnit.SECONDS);
|
||||
assertSpringPulsar();
|
||||
}
|
||||
|
||||
@AfterAll
|
||||
static void teardown() {
|
||||
if (applicationContext != null) {
|
||||
applicationContext.close();
|
||||
}
|
||||
if (pulsarContainer != null) {
|
||||
pulsarContainer.stop();
|
||||
}
|
||||
}
|
||||
|
||||
protected abstract void assertSpringPulsar();
|
||||
|
||||
static final AttributeKey<String> MESSAGE_TYPE =
|
||||
AttributeKey.stringKey("messaging.pulsar.message.type");
|
||||
|
||||
protected List<AttributeAssertion> publishAttributes() {
|
||||
return asList(
|
||||
equalTo(MESSAGING_SYSTEM, "pulsar"),
|
||||
equalTo(MESSAGING_OPERATION, "publish"),
|
||||
equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC),
|
||||
satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
|
||||
satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotEmpty),
|
||||
equalTo(SERVER_ADDRESS, brokerHost),
|
||||
equalTo(SERVER_PORT, brokerPort),
|
||||
equalTo(MESSAGE_TYPE, "normal"));
|
||||
}
|
||||
|
||||
protected List<AttributeAssertion> processAttributes() {
|
||||
return asList(
|
||||
equalTo(MESSAGING_SYSTEM, "pulsar"),
|
||||
equalTo(MESSAGING_OPERATION, "process"),
|
||||
satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
|
||||
satisfies(MESSAGING_MESSAGE_ID, AbstractStringAssert::isNotEmpty),
|
||||
equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC));
|
||||
}
|
||||
|
||||
protected List<AttributeAssertion> receiveAttributes() {
|
||||
return asList(
|
||||
equalTo(MESSAGING_SYSTEM, "pulsar"),
|
||||
equalTo(MESSAGING_OPERATION, "receive"),
|
||||
equalTo(MESSAGING_DESTINATION_NAME, OTEL_TOPIC),
|
||||
satisfies(MESSAGING_MESSAGE_BODY_SIZE, AbstractLongAssert::isNotNegative),
|
||||
satisfies(MESSAGING_BATCH_MESSAGE_COUNT, AbstractLongAssert::isNotNegative),
|
||||
equalTo(SERVER_ADDRESS, brokerHost),
|
||||
equalTo(SERVER_PORT, brokerPort));
|
||||
}
|
||||
|
||||
@SpringBootConfiguration
|
||||
@EnableAutoConfiguration
|
||||
static class ConsumerConfig {
|
||||
@PulsarListener(subscriptionName = OTEL_SUBSCRIPTION, topics = OTEL_TOPIC)
|
||||
void consumer(String ignored) {
|
||||
GlobalTraceUtil.runWithSpan("consumer", () -> {});
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -554,6 +554,8 @@ include(":instrumentation:spring:spring-jms:spring-jms-6.0:javaagent")
|
|||
include(":instrumentation:spring:spring-kafka-2.7:javaagent")
|
||||
include(":instrumentation:spring:spring-kafka-2.7:library")
|
||||
include(":instrumentation:spring:spring-kafka-2.7:testing")
|
||||
include(":instrumentation:spring:spring-pulsar-1.0:javaagent")
|
||||
include(":instrumentation:spring:spring-pulsar-1.0:testing")
|
||||
include(":instrumentation:spring:spring-rabbit-1.0:javaagent")
|
||||
include(":instrumentation:spring:spring-rmi-4.0:javaagent")
|
||||
include(":instrumentation:spring:spring-scheduling-3.1:bootstrap")
|
||||
|
|
Loading…
Reference in New Issue