From 4094f4a92597a3c1251331562fe004dbc772d98c Mon Sep 17 00:00:00 2001 From: addname Date: Sun, 28 Feb 2021 18:10:26 +0800 Subject: [PATCH] Use hooks to register in the iavaagent instrumentation --- .../rocketmq-client-4.8-javaagent.gradle | 5 + .../RocketMqClientApiImplInstrumentation.java | 104 ----------- ...tMqConcurrentlyConsumeInstrumentation.java | 70 -------- .../RocketMqConsumerInstrumentation.java | 50 ++++++ .../RocketMqInstrumentationModule.java | 8 +- ...RocketMqOrderlyConsumeInstrumentation.java | 70 -------- .../RocketMqProducerInstrumentation.java | 50 ++++++ .../rocketmq/RocketMqClientTest.groovy | 1 - .../rocketmq-client-4.8-library.gradle | 5 +- .../rocketmq/RocketMqClientConfig.java | 4 + .../rocketmq/RocketMqConsumerTracer.java | 82 +++++---- .../rocketmq/RocketMqProducerTracer.java | 40 +++-- .../rocketmq/SendCallbackWrapper.java | 35 ---- .../rocketmq/TextMapExtractAdapter.java | 7 +- .../rocketmq/TextMapInjectAdapter.java | 15 +- .../TracingConsumeMessageHookImpl.java | 29 ++++ .../rocketmq/TracingMessageInterceptor.java | 34 ---- .../rocketmq/TracingSendMessageHookImpl.java | 40 +++++ .../rocketmq/RocketMqClientTest.groovy | 91 ++++++++-- .../rocketmq-client-4.8-testing.gradle | 2 - .../AbstractRocketMqClientLibraryTest.groovy | 161 ------------------ .../AbstractRocketMqClientTest.groovy | 60 +++---- .../testing/src/main/java/base/BaseConf.java | 135 +++------------ .../main/java/base/IntegrationTestBase.java | 59 ++++--- .../dledger/DLedgerProduceAndConsumeIT.java | 39 ----- 25 files changed, 434 insertions(+), 762 deletions(-) delete mode 100644 instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientApiImplInstrumentation.java delete mode 100644 instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConcurrentlyConsumeInstrumentation.java create mode 100644 instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java delete mode 100644 instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqOrderlyConsumeInstrumentation.java create mode 100644 instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java delete mode 100644 instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/SendCallbackWrapper.java create mode 100644 instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java delete mode 100644 instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingMessageInterceptor.java create mode 100644 instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java delete mode 100644 instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientLibraryTest.groovy delete mode 100644 instrumentation/rocketmq-client-4.8/testing/src/main/java/base/dledger/DLedgerProduceAndConsumeIT.java diff --git a/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle b/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle index b3f29e425e..d4cd5efb69 100644 --- a/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle +++ b/instrumentation/rocketmq-client-4.8/javaagent/rocketmq-client-4.8-javaagent.gradle @@ -5,8 +5,10 @@ muzzle { group = "org.apache.rocketmq" module = 'rocketmq-client' versions = "[4.8.0,)" + assertInverse = true } } + dependencies { library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0' implementation project(':instrumentation:rocketmq-client-4.8:library') @@ -14,3 +16,6 @@ dependencies { } +tasks.withType(Test) { + jvmArgs "-Dotel.instrumentation.rocketmq.client.experimental-span-attributes=true" +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientApiImplInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientApiImplInstrumentation.java deleted file mode 100644 index 0f08c840ae..0000000000 --- a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqClientApiImplInstrumentation.java +++ /dev/null @@ -1,104 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rocketmq; - -import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer; -import static io.opentelemetry.instrumentation.rocketmq.TextMapInjectAdapter.SETTER; -import static java.util.Collections.singletonMap; -import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import static net.bytebuddy.matcher.ElementMatchers.named; -import static net.bytebuddy.matcher.ElementMatchers.takesArguments; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.Scope; -import io.opentelemetry.instrumentation.rocketmq.SendCallbackWrapper; -import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge; -import io.opentelemetry.javaagent.tooling.TypeInstrumentation; -import java.util.Map; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; - -public class RocketMqClientApiImplInstrumentation implements TypeInstrumentation { - - @Override - public ElementMatcher typeMatcher() { - return named("org.apache.rocketmq.client.impl.MQClientAPIImpl"); - } - - @Override - public Map, String> transformers() { - return singletonMap( - isMethod().and(named("sendMessage")).and(takesArguments(12)), - RocketMqClientApiImplInstrumentation.class.getName() + "$SendMessageAdvice"); - } - - public static class SendMessageAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(value = 0, readOnly = false) String addr, - @Advice.Argument(value = 2, readOnly = false) Message msg, - @Advice.Argument(value = 3, readOnly = false) SendMessageRequestHeader requestHeader, - @Advice.Argument(value = 6, readOnly = false) SendCallback sendCallback, - @Advice.Local("otelSpan") Span span, - @Advice.Local("otelScope") Scope scope) { - - Context parent = Java8BytecodeBridge.currentContext(); - span = tracer().startProducerSpan(addr, msg); - Context newContext = parent.with(span); - try { - Java8BytecodeBridge.getGlobalPropagators() - .getTextMapPropagator() - .inject(newContext, requestHeader, SETTER); - } catch (IllegalStateException e) { - requestHeader = new SendMessageRequestHeader(); - requestHeader.getBornTimestamp(); - requestHeader.getDefaultTopic(); - requestHeader.getDefaultTopicQueueNums(); - requestHeader.getFlag(); - requestHeader.getProducerGroup(); - requestHeader.getMaxReconsumeTimes(); - requestHeader.getProperties(); - requestHeader.getSysFlag(); - requestHeader.getTopic(); - requestHeader.getQueueId(); - requestHeader.getReconsumeTimes(); - Java8BytecodeBridge.getGlobalPropagators() - .getTextMapPropagator() - .inject(newContext, requestHeader, SETTER); - } - - scope = newContext.makeCurrent(); - if (sendCallback != null) { - sendCallback = new SendCallbackWrapper(sendCallback, span); - } - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Thrown Throwable throwable, - @Advice.Argument(value = 6, readOnly = false) SendCallback sendCallback, - @Advice.Local("otelSpan") Span span, - @Advice.Local("otelScope") Scope scope) { - if (scope == null) { - return; - } - scope.close(); - if (sendCallback == null) { - if (throwable == null) { - tracer().end(span); - } else { - tracer().endExceptionally(span, throwable); - } - } - } - } -} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConcurrentlyConsumeInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConcurrentlyConsumeInstrumentation.java deleted file mode 100644 index 3b731009e5..0000000000 --- a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConcurrentlyConsumeInstrumentation.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rocketmq; - -import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer; -import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; -import static java.util.Collections.singletonMap; -import static net.bytebuddy.matcher.ElementMatchers.isMethod; -import static net.bytebuddy.matcher.ElementMatchers.named; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.tooling.TypeInstrumentation; -import java.util.List; -import java.util.Map; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.common.message.MessageExt; - -public class RocketMqConcurrentlyConsumeInstrumentation implements TypeInstrumentation { - - @Override - public ElementMatcher typeMatcher() { - return implementsInterface( - named("org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently")); - } - - @Override - public Map, String> transformers() { - return singletonMap( - isMethod().and(named("consumeMessage")), - RocketMqConcurrentlyConsumeInstrumentation.class.getName() + "$ConcurrentlyConsumeAdvice"); - } - - public static class ConcurrentlyConsumeAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) List msgs, - @Advice.Local("otelSpan") Span span, - @Advice.Local("otelScope") Scope scope) { - - span = tracer().startSpan(msgs); - scope = span.makeCurrent(); - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Return ConsumeConcurrentlyStatus status, - @Advice.Thrown Throwable throwable, - @Advice.Local("otelSpan") Span span, - @Advice.Local("otelScope") Scope scope) { - if (scope == null) { - return; - } - tracer().endConcurrentlySpan(span, status); - scope.close(); - if (throwable == null) { - tracer().end(span); - } else { - tracer().endExceptionally(span, throwable); - } - } - } -} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java new file mode 100644 index 0000000000..9d0ec63769 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqConsumerInstrumentation.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmq; + +import io.opentelemetry.instrumentation.rocketmq.TracingConsumeMessageHookImpl; +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl; +import java.util.Map; + +import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +public class RocketMqConsumerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer"); + } + + @Override + public Map, String> transformers() { + return singletonMap( + isMethod().and(named("start")).and(takesArguments(0)), + RocketMqConsumerInstrumentation.class.getName() + "$AdviceStart"); + } + + public static class AdviceStart { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.FieldValue(value = "defaultMQPushConsumerImpl",declaringType = DefaultMQPushConsumer.class) DefaultMQPushConsumerImpl defaultMQPushConsumerImpl){ + defaultMQPushConsumerImpl.registerConsumeMessageHook(new TracingConsumeMessageHookImpl()); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java index 6f9332e74f..dc532b7985 100644 --- a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqInstrumentationModule.java @@ -15,14 +15,14 @@ import java.util.List; @AutoService(InstrumentationModule.class) public class RocketMqInstrumentationModule extends InstrumentationModule { public RocketMqInstrumentationModule() { - super("rocketmq", "rockemq-client-4.3"); + super("rocketmq-client", "rocketmq-client-4.8"); } @Override public List typeInstrumentations() { return asList( - new RocketMqClientApiImplInstrumentation(), - new RocketMqConcurrentlyConsumeInstrumentation(), - new RocketMqOrderlyConsumeInstrumentation()); + new RocketMqProducerInstrumentation(), + new RocketMqConsumerInstrumentation() + ); } } diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqOrderlyConsumeInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqOrderlyConsumeInstrumentation.java deleted file mode 100644 index e2d3d455e5..0000000000 --- a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqOrderlyConsumeInstrumentation.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.javaagent.instrumentation.rocketmq; - -import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer; -import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface; -import static java.util.Collections.singletonMap; -import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith; -import static net.bytebuddy.matcher.ElementMatchers.named; - -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.context.Scope; -import io.opentelemetry.javaagent.tooling.TypeInstrumentation; -import java.util.List; -import java.util.Map; -import net.bytebuddy.asm.Advice; -import net.bytebuddy.description.method.MethodDescription; -import net.bytebuddy.description.type.TypeDescription; -import net.bytebuddy.matcher.ElementMatcher; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.common.message.MessageExt; - -public class RocketMqOrderlyConsumeInstrumentation implements TypeInstrumentation { - - @Override - public ElementMatcher typeMatcher() { - return implementsInterface( - named("org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly")); - } - - @Override - public Map, String> transformers() { - return singletonMap( - nameStartsWith("consumeMessage"), - RocketMqOrderlyConsumeInstrumentation.class.getName() + "$OrderlyConsumeAdvice"); - } - - public static class OrderlyConsumeAdvice { - @Advice.OnMethodEnter(suppress = Throwable.class) - public static void onEnter( - @Advice.Argument(0) List msgs, - @Advice.Local("otelSpan") Span span, - @Advice.Local("otelScope") Scope scope) { - - span = tracer().startSpan(msgs); - scope = span.makeCurrent(); - } - - @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) - public static void stopSpan( - @Advice.Return ConsumeOrderlyStatus status, - @Advice.Thrown Throwable throwable, - @Advice.Local("otelSpan") Span span, - @Advice.Local("otelScope") Scope scope) { - if (scope == null) { - return; - } - tracer().endOrderlySpan(span, status); - scope.close(); - if (throwable == null) { - tracer().end(span); - } else { - tracer().endExceptionally(span, throwable); - } - } - } -} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java new file mode 100644 index 0000000000..90cbe3632a --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/main/java/io/opentelemetry/javaagent/instrumentation/rocketmq/RocketMqProducerInstrumentation.java @@ -0,0 +1,50 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.javaagent.instrumentation.rocketmq; + +import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed; +import static java.util.Collections.singletonMap; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import io.opentelemetry.instrumentation.rocketmq.TracingSendMessageHookImpl; +import io.opentelemetry.javaagent.tooling.TypeInstrumentation; +import java.util.Map; +import net.bytebuddy.asm.Advice; +import net.bytebuddy.description.method.MethodDescription; +import net.bytebuddy.description.type.TypeDescription; +import net.bytebuddy.matcher.ElementMatcher; +import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl; +import org.apache.rocketmq.client.producer.DefaultMQProducer; + +public class RocketMqProducerInstrumentation implements TypeInstrumentation { + + @Override + public ElementMatcher classLoaderOptimization() { + return hasClassesNamed("org.apache.rocketmq.client.producer.DefaultMQProducer"); + } + + @Override + public ElementMatcher typeMatcher() { + return named("org.apache.rocketmq.client.producer.DefaultMQProducer"); + } + + @Override + public Map, String> transformers() { + return singletonMap( + isMethod().and(named("start")).and(takesArguments(0)), + RocketMqProducerInstrumentation.class.getName() + "$AdviceStart"); + } + + public static class AdviceStart { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static void onEnter( + @Advice.FieldValue(value = "defaultMQProducerImpl",declaringType = DefaultMQProducer.class) DefaultMQProducerImpl defaultMQProducerImpl){ + defaultMQProducerImpl.registerSendMessageHook(new TracingSendMessageHookImpl()); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy index 4014df9ed6..2dc2e21add 100644 --- a/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/javaagent/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy @@ -10,5 +10,4 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait { - } \ No newline at end of file diff --git a/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle b/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle index 4357ed4f2d..e2ac77c73d 100644 --- a/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle +++ b/instrumentation/rocketmq-client-4.8/library/rocketmq-client-4.8-library.gradle @@ -1,8 +1,11 @@ apply from: "$rootDir/gradle/instrumentation-library.gradle" - dependencies { library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0' testImplementation project(':instrumentation:rocketmq-client-4.8:testing') } + +tasks.withType(Test) { + jvmArgs "-Dotel.instrumentation.rocketmq.client.experimental-span-attributes=true" +} \ No newline at end of file diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqClientConfig.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqClientConfig.java index ba0b631846..c9ba3789d7 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqClientConfig.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqClientConfig.java @@ -14,5 +14,9 @@ public final class RocketMqClientConfig { .getBooleanProperty("otel.instrumentation.rocketmq.client-propagation", true); } + public static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES = + Config.get() + .getBooleanProperty("otel.instrumentation.rocketmq.client.experimental-span-attributes", false); + private RocketMqClientConfig() {} } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java index f53d83c16b..696cde1162 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java @@ -9,12 +9,11 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER; import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER; import io.opentelemetry.api.trace.Span; +import io.opentelemetry.api.trace.SpanBuilder; import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import java.util.List; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import org.apache.rocketmq.common.message.MessageExt; public class RocketMqConsumerTracer extends BaseTracer { @@ -30,22 +29,46 @@ public class RocketMqConsumerTracer extends BaseTracer { return "io.opentelemetry.javaagent.rocketmq-client"; } - public Span startSpan(List msgs) { + public Context startSpan(Context parentContext, List msgs) { MessageExt msg = msgs.get(0); - Span span = - tracer - .spanBuilder(spanNameOnConsume(msg)) - .setSpanKind(CONSUMER) - .setParent(extractParent(msg)) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()) - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") - .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") - .setAttribute( - SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, getStoreSize(msgs)) - .startSpan(); - onConsume(span, msg); - return span; + if (msgs.size() == 1) { + SpanBuilder spanBuilder = startSpanBuilder(msg) + .setParent(extractParent(msg)); + return withClientSpan(parentContext, spanBuilder.startSpan()); + } else { + SpanBuilder spanBuilder = + tracer + .spanBuilder(msg.getTopic() + " receive") + .setSpanKind(CONSUMER) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive"); + Context rootContext = withClientSpan(parentContext, spanBuilder.startSpan()); + for (MessageExt message : msgs) { + createChildSpan(rootContext, message); + } + return rootContext; + } + } + + public void createChildSpan(Context parentContext, MessageExt msg) { + SpanBuilder childSpanBuilder = startSpanBuilder(msg) + .setParent(parentContext) + .addLink(Span.fromContext(extractParent(msg)).getSpanContext()); + end(withClientSpan(parentContext, childSpanBuilder.startSpan())); + } + + public SpanBuilder startSpanBuilder(MessageExt msg) { + SpanBuilder spanBuilder = tracer.spanBuilder(spanNameOnConsume(msg)) + .setSpanKind(CONSUMER) + .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()) + .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") + .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") + .setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId()) + .setAttribute( + SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) msg.getBody().length); + onConsume(spanBuilder, msg); + return spanBuilder; } private Context extractParent(MessageExt msg) { @@ -56,16 +79,13 @@ public class RocketMqConsumerTracer extends BaseTracer { } } - void onConsume(Span span, MessageExt msg) { - span.setAttribute("messaging.rocketmq.tags", msg.getTags()); - span.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId()); - span.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset()); - span.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg)); - } - - long getStoreSize(List msgs) { - long storeSize = msgs.stream().mapToInt(item -> item.getStoreSize()).sum(); - return storeSize; + void onConsume(SpanBuilder spanBuilder, MessageExt msg) { + if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags()); + spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId()); + spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset()); + spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg)); + } } String spanNameOnConsume(MessageExt msg) { @@ -79,12 +99,4 @@ public class RocketMqConsumerTracer extends BaseTracer { return null; } } - - public void endConcurrentlySpan(Span span, ConsumeConcurrentlyStatus status) { - span.setAttribute("messaging.rocketmq.consume_concurrently_status", status.name()); - } - - public void endOrderlySpan(Span span, ConsumeOrderlyStatus status) { - span.setAttribute("messaging.rocketmq.consume_orderly_status", status.name()); - } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java index ad344db98d..c12b2e7bd2 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java @@ -9,6 +9,7 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER; import io.opentelemetry.api.trace.Span; import io.opentelemetry.api.trace.SpanBuilder; +import io.opentelemetry.context.Context; import io.opentelemetry.instrumentation.api.tracer.BaseTracer; import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; import org.apache.rocketmq.client.producer.SendResult; @@ -27,25 +28,38 @@ public class RocketMqProducerTracer extends BaseTracer { return "io.opentelemetry.javaagent.rocketmq-client"; } - public Span startProducerSpan(String addr, Message msg) { - SpanBuilder span = spanBuilder(spanNameOnProduce(msg), PRODUCER); - onProduce(span, msg, addr); - return span.startSpan(); + public Context startProducerSpan(String addr, Message msg, Context parentContext) { + SpanBuilder spanBuilder = spanBuilder(spanNameOnProduce(msg), PRODUCER); + onProduce(spanBuilder, msg, addr); + return withClientSpan(parentContext, spanBuilder.startSpan()); } - public void onCallback(Span span, SendResult sendResult) { - span.setAttribute("messaging.rocketmq.callback_result", sendResult.getSendStatus().name()); + public void onCallback(Context context, SendResult sendResult) { + if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + Span span = Span.fromContext(context); + span.setAttribute("messaging.rocketmq.callback_result", sendResult.getSendStatus().name()); + } } - public void onProduce(SpanBuilder span, Message msg, String addr) { - span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"); - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"); - span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()); - span.setAttribute("messaging.rocketmq.tags", msg.getTags()); - span.setAttribute("messaging.rocketmq.broker_address", addr); + private void onProduce(SpanBuilder spanBuilder, Message msg, String addr) { + spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"); + spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"); + spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()); + if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags()); + spanBuilder.setAttribute("messaging.rocketmq.broker_address", addr); + } } - public String spanNameOnProduce(Message msg) { + public void afterProduce(Context context, SendResult sendResult) { + Span span = Span.fromContext(context); + span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, sendResult.getMsgId()); + if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) { + span.setAttribute("messaging.rocketmq.send_result", sendResult.getSendStatus().name()); + } + } + + private String spanNameOnProduce(Message msg) { return msg.getTopic() + " send"; } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/SendCallbackWrapper.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/SendCallbackWrapper.java deleted file mode 100644 index fb3bd39bf8..0000000000 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/SendCallbackWrapper.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmq; - -import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer; - -import io.opentelemetry.api.trace.Span; -import org.apache.rocketmq.client.producer.SendCallback; -import org.apache.rocketmq.client.producer.SendResult; - -public class SendCallbackWrapper implements SendCallback { - private final SendCallback sendCallback; - private final Span span; - - public SendCallbackWrapper(SendCallback sendCallback, Span span) { - this.sendCallback = sendCallback; - this.span = span; - } - - @Override - public void onSuccess(SendResult sendResult) { - tracer().onCallback(span, sendResult); - tracer().end(span); - sendCallback.onSuccess(sendResult); - } - - @Override - public void onException(Throwable e) { - tracer().endExceptionally(span, e); - sendCallback.onException(e); - } -} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java index c343e84b7a..94ae5beef5 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java @@ -5,10 +5,10 @@ package io.opentelemetry.instrumentation.rocketmq; -import io.opentelemetry.context.propagation.TextMapPropagator; +import io.opentelemetry.context.propagation.TextMapGetter; import java.util.Map; -public class TextMapExtractAdapter implements TextMapPropagator.Getter> { +public class TextMapExtractAdapter implements TextMapGetter> { public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); @@ -19,7 +19,6 @@ public class TextMapExtractAdapter implements TextMapPropagator.Getter carrier, String key) { - String obj = carrier.get(key); - return obj; + return carrier.get(key); } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java index 22ec6733bd..40ba41a351 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapInjectAdapter.java @@ -5,20 +5,15 @@ package io.opentelemetry.instrumentation.rocketmq; -import io.opentelemetry.context.propagation.TextMapPropagator; -import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader; +import io.opentelemetry.context.propagation.TextMapSetter; +import java.util.Map; -public class TextMapInjectAdapter implements TextMapPropagator.Setter { +public class TextMapInjectAdapter implements TextMapSetter> { public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter(); @Override - public void set(SendMessageRequestHeader header, String key, String value) { - StringBuilder properties = new StringBuilder(header.getProperties()); - properties.append(key); - properties.append('\u0001'); - properties.append(value); - properties.append('\u0002'); - header.setProperties(properties.toString()); + public void set(Map carrier, String key, String value) { + carrier.put(key,value); } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java new file mode 100644 index 0000000000..ca079db21f --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java @@ -0,0 +1,29 @@ +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.context.Context; +import org.apache.rocketmq.client.hook.ConsumeMessageContext; +import org.apache.rocketmq.client.hook.ConsumeMessageHook; + +import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer; + +public class TracingConsumeMessageHookImpl implements ConsumeMessageHook { + + @Override + public String hookName() { + return "OpenTelemetryConsumeMessageTraceHook"; + } + + @Override + public void consumeMessageBefore(ConsumeMessageContext context) { + } + + @Override + public void consumeMessageAfter(ConsumeMessageContext context) { + if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { + return; + } + Context traceContext =tracer().startSpan(Context.current(),context.getMsgList()); + tracer().end(traceContext); + } + +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingMessageInterceptor.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingMessageInterceptor.java deleted file mode 100644 index af1ac2df8d..0000000000 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingMessageInterceptor.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmq; - -import io.opentelemetry.api.trace.Span; -import java.util.List; -import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; -import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.common.message.MessageExt; - -public class TracingMessageInterceptor { - - public void producerIntercept(String addr, Message msg) { - Span span = RocketMqProducerTracer.tracer().startProducerSpan(addr, msg); - RocketMqProducerTracer.tracer().end(span); - } - - public void consumerConcurrentlyIntercept(List msgs) { - Span span = RocketMqConsumerTracer.tracer().startSpan(msgs); - RocketMqConsumerTracer.tracer() - .endConcurrentlySpan(span, ConsumeConcurrentlyStatus.CONSUME_SUCCESS); - RocketMqConsumerTracer.tracer().tracer().end(span); - } - - public void consumerOrderlyIntercept(List msgs) { - Span span = RocketMqConsumerTracer.tracer().startSpan(msgs); - RocketMqConsumerTracer.tracer().endOrderlySpan(span, ConsumeOrderlyStatus.SUCCESS); - RocketMqConsumerTracer.tracer().tracer().end(span); - } -} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java new file mode 100644 index 0000000000..5177d69c78 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java @@ -0,0 +1,40 @@ +package io.opentelemetry.instrumentation.rocketmq; + +import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer; +import static io.opentelemetry.instrumentation.rocketmq.TextMapInjectAdapter.SETTER; + +import io.opentelemetry.api.GlobalOpenTelemetry; +import io.opentelemetry.context.Context; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.client.hook.SendMessageHook; + +public class TracingSendMessageHookImpl implements SendMessageHook { + + @Override + public String hookName() { + return "OpenTelemetrySendMessageTraceHook"; + } + + @Override + public void sendMessageBefore(SendMessageContext context) { + if (context == null) { + return; + } + Context traceContext = tracer().startProducerSpan(context.getBrokerAddr(), context.getMessage(), Context.current()); + if (RocketMqClientConfig.isPropagationEnabled()){ + GlobalOpenTelemetry.getPropagators() + .getTextMapPropagator() + .inject(traceContext, context.getMessage().getProperties(), SETTER); + } + context.setMqTraceContext(traceContext); + } + + @Override + public void sendMessageAfter(SendMessageContext context) { + if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) { + return; + } + tracer().afterProduce((Context)context.getMqTraceContext(), context.getSendResult()); + tracer().end((Context)context.getMqTraceContext()); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy index bc08fa7306..989b80c375 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/library/src/test/groovy/io/opentelemetry/instrumentation/rocketmq/RocketMqClientTest.groovy @@ -4,25 +4,90 @@ */ package io.opentelemetry.instrumentation.rocketmq -import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientLibraryTest + +import base.BaseConf +import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.instrumentation.test.LibraryTestTrait +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer +import org.apache.rocketmq.client.producer.DefaultMQProducer import org.apache.rocketmq.common.message.Message +import org.apache.rocketmq.remoting.common.RemotingHelper +import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener +import spock.lang.Shared -class RocketMqClientTest extends AbstractRocketMqClientLibraryTest implements LibraryTestTrait { +import static io.opentelemetry.api.trace.SpanKind.CONSUMER +import static io.opentelemetry.api.trace.SpanKind.PRODUCER +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan +import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace - @Override - void producerIntercept(String addr,Message msg) { - TracingMessageInterceptor tracingProducerInterceptor= new TracingMessageInterceptor() - tracingProducerInterceptor.producerIntercept(addr,msg) +class RocketMqClientTest extends InstrumentationSpecification implements LibraryTestTrait { + + @Shared + DefaultMQPushConsumer consumer + + @Shared + DefaultMQProducer producer + + @Shared + String sharedTopic + + @Shared + String brokerAddr + + @Shared + Message msg + + def setup() { + sharedTopic = BaseConf.initTopic() + brokerAddr = BaseConf.getBrokerAddr() + msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) } - @Override - void consumerIntercept(List msg, String type) { - TracingMessageInterceptor tracingProducerInterceptor= new TracingMessageInterceptor() - if("concurrent".equals(type)){ - tracingProducerInterceptor.consumerConcurrentlyIntercept(msg) - } else { - tracingProducerInterceptor.consumerOrderlyIntercept(msg) + def "test rocketmq produce and consume"() { + setup: + producer = BaseConf.getRMQProducer(BaseConf.nsAddr) + producer.getDefaultMQProducerImpl().registerSendMessageHook(new TracingSendMessageHookImpl()) + + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) + consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new TracingConsumeMessageHookImpl()) + when: + runUnderTrace("parent") { + producer.send(msg) + } + then: + assertTraces(1) { + trace(0, 3) { + basicSpan(it, 0, "parent") + span(1) { + name sharedTopic + " send" + kind PRODUCER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + } + } + span(2) { + name sharedTopic + " process" + kind CONSUMER + attributes { + "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" + "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic + "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" + "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String + } + } + } + cleanup: + producer.shutdown() + consumer.shutdown() + BaseConf.deleteTempDir() } } + } + diff --git a/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle b/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle index 6773527770..ec6d24d3ce 100644 --- a/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle +++ b/instrumentation/rocketmq-client-4.8/testing/rocketmq-client-4.8-testing.gradle @@ -3,8 +3,6 @@ apply from: "$rootDir/gradle/java.gradle" dependencies { api project(':testing-common') api group: 'org.apache.rocketmq', name: 'rocketmq-test', version: '4.8.0' - api group: 'io.openmessaging', name: 'openmessaging-api', version: '0.3.1-alpha' - api group: 'org.apache.rocketmq', name: 'rocketmq-openmessaging', version: '4.8.0' implementation deps.guava implementation deps.groovy diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientLibraryTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientLibraryTest.groovy deleted file mode 100644 index 54d8d47d72..0000000000 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientLibraryTest.groovy +++ /dev/null @@ -1,161 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetery.instrumentation.rocketmq - -import base.BaseConf -import io.opentelemetry.instrumentation.test.InstrumentationSpecification -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes -import org.apache.rocketmq.common.message.Message -import org.apache.rocketmq.remoting.common.RemotingHelper -import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer -import org.apache.rocketmq.test.client.rmq.RMQNormalProducer -import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener -import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener -import spock.lang.Shared -import spock.lang.Unroll - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER -import static io.opentelemetry.api.trace.SpanKind.PRODUCER -import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan -import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace - -@Unroll -abstract class AbstractRocketMqClientLibraryTest extends InstrumentationSpecification{ - - @Shared - RMQNormalConsumer consumer - - @Shared - RMQNormalProducer producer - - @Shared - String sharedTopic - - @Shared - String brokerAddr - - @Shared - Message msg - - @Shared - int consumeTime = 5000 - - @Shared - def baseConf =new BaseConf() - - abstract void producerIntercept(String addr,Message msg) - - abstract void consumerIntercept(List msgs,String type) - - def setup() { - sharedTopic =baseConf.initTopic() - brokerAddr =baseConf.getBrokerAddr() - msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) - } - - def "test rocketmq produce"() { - setup: - producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic) - when: - runUnderTrace("parent") { - producerIntercept(brokerAddr,msg) - producer.send(msg) - } - then: - assertTraces(1) { - trace(0, 2) { - basicSpan(it, 0, "parent") - span(1) { - name sharedTopic + " send" - kind PRODUCER - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "messaging.rocketmq.tags" "TagA" - "messaging.rocketmq.broker_address" brokerAddr - } - } - } - cleanup: - producer.shutdown() - } - } - - def "test rocketmq concurrently consume"() { - setup: - producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic) - consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQNormalListener()) - when: - producer.send(msg) - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime) - consumerIntercept(consumer.getListener().getAllOriginMsg(),"concurrent") - then: - assertTraces(1) { - trace(0, 1) { - span(0) { - name sharedTopic + " process" - kind CONSUMER - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "messaging.rocketmq.tags" "TagA" - "messaging.rocketmq.broker_address" brokerAddr - "messaging.rocketmq.consume_concurrently_status" "CONSUME_SUCCESS" - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - - } - } - } - cleanup: - producer.shutdown() - consumer.shutdown() - - } - } - - def "test rocketmq orderly consume"() { - setup: - producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic) - consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) - when: - producer.send(msg) - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime) - consumerIntercept(consumer.getListener().getAllOriginMsg(),"order") - - then: - assertTraces(1) { - trace(0, 1) { - span(0) { - name sharedTopic + " process" - kind CONSUMER - attributes { - "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" - "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic - "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" - "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" - "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long - "messaging.rocketmq.tags" "TagA" - "messaging.rocketmq.broker_address" brokerAddr - "messaging.rocketmq.consume_orderly_status" "SUCCESS" - "messaging.rocketmq.queue_id" Long - "messaging.rocketmq.queue_offset" Long - - } - } - } - cleanup: - producer.shutdown() - consumer.shutdown() - - } - } -} - diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index f241b2f6ab..efeaa38aed 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetery/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -8,14 +8,12 @@ package io.opentelemetery.instrumentation.rocketmq import base.BaseConf import io.opentelemetry.instrumentation.test.InstrumentationSpecification import io.opentelemetry.semconv.trace.attributes.SemanticAttributes +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer import org.apache.rocketmq.client.producer.DefaultMQProducer import org.apache.rocketmq.client.producer.SendCallback import org.apache.rocketmq.client.producer.SendResult import org.apache.rocketmq.common.message.Message import org.apache.rocketmq.remoting.common.RemotingHelper -import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer -import org.apache.rocketmq.test.client.rmq.RMQNormalProducer -import org.apache.rocketmq.test.factory.ProducerFactory import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener import spock.lang.Shared @@ -29,41 +27,32 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ @Shared - private RMQNormalConsumer consumer + DefaultMQPushConsumer consumer @Shared - private RMQNormalProducer producer + DefaultMQProducer producer @Shared - DefaultMQProducer defaultMQProducer + String sharedTopic @Shared - private String sharedTopic - - @Shared - private String brokerAddr + String brokerAddr @Shared Message msg - @Shared - int consumeTime = 1000 - - @Shared - BaseConf baseConf =new BaseConf() - def setup() { - sharedTopic =baseConf.initTopic() - brokerAddr =baseConf.getBrokerAddr() + sharedTopic = BaseConf.initTopic() + brokerAddr = BaseConf.getBrokerAddr() msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET)) } - def "test rocketmq produce callback1"() { + def "test rocketmq produce callback"() { setup: - defaultMQProducer = ProducerFactory.getRMQProducer(baseConf.nsAddr) + producer = BaseConf.getRMQProducer(BaseConf.nsAddr) when: runUnderTrace("parent") { - defaultMQProducer.send(msg, new SendCallback() { + producer.send(msg, new SendCallback() { @Override void onSuccess(SendResult sendResult) { } @@ -85,23 +74,26 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String "messaging.rocketmq.tags" "TagA" "messaging.rocketmq.broker_address" brokerAddr - "messaging.rocketmq.callback_result" "SEND_OK" + "messaging.rocketmq.send_result" "SEND_OK" + } } } + cleanup: + BaseConf.deleteTempDir() } } def "test rocketmq produce and concurrently consume"() { setup: - producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic) - consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQNormalListener()) + producer = BaseConf.getRMQProducer(BaseConf.nsAddr) + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQNormalListener()) when: runUnderTrace("parent") { producer.send(msg) - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime) } then: assertTraces(1) { @@ -114,8 +106,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String "messaging.rocketmq.tags" "TagA" "messaging.rocketmq.broker_address" brokerAddr + "messaging.rocketmq.send_result" "SEND_OK" } } span(2) { @@ -127,9 +121,9 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String "messaging.rocketmq.tags" "TagA" "messaging.rocketmq.broker_address" brokerAddr - "messaging.rocketmq.consume_concurrently_status" "CONSUME_SUCCESS" "messaging.rocketmq.queue_id" Long "messaging.rocketmq.queue_offset" Long @@ -139,19 +133,18 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ cleanup: producer.shutdown() consumer.shutdown() - + BaseConf.deleteTempDir() } } def "test rocketmq produce and orderly consume"() { setup: - producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic) - consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) + producer = BaseConf.getRMQProducer(BaseConf.nsAddr) + consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener()) when: runUnderTrace("parent") { producer.send(msg) - consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime) } then: assertTraces(1) { @@ -164,8 +157,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ "${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq" "${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String "messaging.rocketmq.tags" "TagA" "messaging.rocketmq.broker_address" brokerAddr + "messaging.rocketmq.send_result" "SEND_OK" } } span(2) { @@ -177,19 +172,18 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{ "${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic" "${SemanticAttributes.MESSAGING_OPERATION.key}" "process" "${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long + "${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String "messaging.rocketmq.tags" "TagA" "messaging.rocketmq.broker_address" brokerAddr - "messaging.rocketmq.consume_orderly_status" "SUCCESS" "messaging.rocketmq.queue_id" Long "messaging.rocketmq.queue_offset" Long - } } } cleanup: producer.shutdown() consumer.shutdown() - + BaseConf.deleteTempDir() } } } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java index 6f6e5e61da..296e8c0707 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/BaseConf.java @@ -7,24 +7,21 @@ package base; import java.util.ArrayList; import java.util.List; +import java.util.UUID; import org.apache.log4j.Logger; import org.apache.rocketmq.broker.BrokerController; -import org.apache.rocketmq.client.producer.TransactionListener; +import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; +import org.apache.rocketmq.client.exception.MQClientException; +import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.MQVersion; import org.apache.rocketmq.namesrv.NamesrvController; import org.apache.rocketmq.remoting.protocol.RemotingCommand; -import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer; -import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer; -import org.apache.rocketmq.test.client.rmq.RMQNormalProducer; -import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer; -import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer; -import org.apache.rocketmq.test.clientinterface.AbstractMQProducer; -import org.apache.rocketmq.test.factory.ConsumerFactory; import org.apache.rocketmq.test.listener.AbstractListener; import org.apache.rocketmq.test.util.MQAdmin; import org.apache.rocketmq.test.util.MQRandomUtils; +import org.apache.rocketmq.test.util.RandomUtil; -public class BaseConf { +final class BaseConf { public static final String nsAddr; public static final String broker1Addr; protected static String broker1Name; @@ -48,12 +45,13 @@ public class BaseConf { brokerNum = 2; } - public BaseConf() {} + private BaseConf() {} public static String initTopic() { String topic = MQRandomUtils.getRandomTopic(); - IntegrationTestBase.initTopic(topic, nsAddr, clusterName); - + if(!IntegrationTestBase.initTopic(topic, nsAddr, clusterName)){ + log.error("Topic init failed"); + } return topic; } @@ -71,105 +69,28 @@ public class BaseConf { return group; } - public static RMQNormalProducer getProducer(String nsAddr, String topic) { - return getProducer(nsAddr, topic, false); - } - - public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTls) { - RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTls); - if (debug) { - producer.setDebug(); - } - mqClients.add(producer); - return producer; - } - - public static RMQNormalProducer getProducer( - String nsAddr, String topic, String producerGoup, String instanceName) { - RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, instanceName); - if (debug) { - producer.setDebug(); - } - mqClients.add(producer); - return producer; - } - - public static RMQTransactionalProducer getTransactionalProducer( - String nsAddr, String topic, TransactionListener transactionListener) { - RMQTransactionalProducer producer = - new RMQTransactionalProducer(nsAddr, topic, false, transactionListener); - if (debug) { - producer.setDebug(); - } - mqClients.add(producer); - return producer; - } - - public static RMQAsyncSendProducer getAsyncProducer(String nsAddr, String topic) { - RMQAsyncSendProducer producer = new RMQAsyncSendProducer(nsAddr, topic); - if (debug) { - producer.setDebug(); - } - mqClients.add(producer); - return producer; - } - - public static RMQNormalConsumer getConsumer( - String nsAddr, String topic, String subExpression, AbstractListener listener) { - return getConsumer(nsAddr, topic, subExpression, listener, false); - } - - public static RMQNormalConsumer getConsumer( - String nsAddr, - String topic, - String subExpression, - AbstractListener listener, - boolean useTls) { + public static DefaultMQPushConsumer getConsumer(String nsAddr, String topic, String subExpression, + AbstractListener listener) + throws MQClientException { String consumerGroup = initConsumerGroup(); - return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTls); - } - - public static RMQNormalConsumer getConsumer( - String nsAddr, - String consumerGroup, - String topic, - String subExpression, - AbstractListener listener) { - return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false); - } - - public static RMQNormalConsumer getConsumer( - String nsAddr, - String consumerGroup, - String topic, - String subExpression, - AbstractListener listener, - boolean useTls) { - RMQNormalConsumer consumer = - ConsumerFactory.getRMQNormalConsumer( - nsAddr, consumerGroup, topic, subExpression, listener, useTls); - if (debug) { - consumer.setDebug(); - } - mqClients.add(consumer); - log.info( - String.format( - "consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, topic, subExpression)); + DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup); + consumer.setInstanceName(RandomUtil.getStringByUUID()); + consumer.setNamesrvAddr(nsAddr); + consumer.subscribe(topic, subExpression); + consumer.setMessageListener(listener); + consumer.start(); return consumer; } - public static void shutdown() { - try { - for (Object mqClient : mqClients) { - if (mqClient instanceof AbstractMQProducer) { - ((AbstractMQProducer) mqClient).shutdown(); + public static DefaultMQProducer getRMQProducer(String ns) throws MQClientException { + DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID()); + producer.setInstanceName(UUID.randomUUID().toString()); + producer.setNamesrvAddr(ns); + producer.start(); + return producer; + } - } else { - ((AbstractMQConsumer) mqClient).shutdown(); - } - } - } catch (Exception e) { - e.printStackTrace(); - } + private static void deleteTempDir() { + IntegrationTestBase.deleteTempDir(); } } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java index fbbbfd2246..e8055d7bbc 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/IntegrationTestBase.java @@ -6,14 +6,16 @@ package base; import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.List; import java.util.Random; -import java.util.UUID; import java.util.concurrent.atomic.AtomicInteger; import org.apache.rocketmq.broker.BrokerController; import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.namesrv.NamesrvConfig; import org.apache.rocketmq.logging.InternalLogger; import org.apache.rocketmq.logging.InternalLoggerFactory; @@ -29,7 +31,6 @@ public class IntegrationTestBase { public static final InternalLogger logger = InternalLoggerFactory.getLogger(IntegrationTestBase.class); - protected static final String SEP = File.separator; protected static final String BROKER_NAME_PREFIX = "TestBrokerName_"; protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0); protected static final List TMPE_FILES = new ArrayList<>(); @@ -46,26 +47,37 @@ public class IntegrationTestBase { protected static final Random random = new Random(); - public static String createBaseDir() { - String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID(); - final File file = new File(baseDir); - if (file.exists()) { - logger.info( - String.format( - "[%s] has already existed, please back up and remove it for integration tests", - baseDir)); - System.exit(1); + private static String createTempDir() { + String path = null; + try { + File file = Files.createTempDirectory("opentelemetry-rocketmq-client-temp").toFile(); + TMPE_FILES.add(file); + path= file.getCanonicalPath(); + } catch (IOException e) { + e.printStackTrace(); + } + return path; + } + + public static void deleteTempDir() { + for(File file:TMPE_FILES){ + boolean deleted = file.delete(); + if (!deleted) { + file.deleteOnExit(); + } } - TMPE_FILES.add(file); - return baseDir; } public static NamesrvController createAndStartNamesrv() { - String baseDir = createBaseDir(); + String baseDir = createTempDir(); + Path kvConfigPath = Paths.get(baseDir,"namesrv","kvConfig.json"); + Path namesrvPath = Paths.get(baseDir,"namesrv","namesrv.properties"); + NamesrvConfig namesrvConfig = new NamesrvConfig(); NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig(); - namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json"); - namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties"); + + namesrvConfig.setKvConfigPath(kvConfigPath.toString()); + namesrvConfig.setConfigStorePath(namesrvPath.toString()); nameServerNettyServerConfig.setListenPort(nextPort()); NamesrvController namesrvController = @@ -76,14 +88,15 @@ public class IntegrationTestBase { namesrvController.start(); } catch (Exception e) { logger.info("Name Server start failed", e); - System.exit(1); } NAMESRV_CONTROLLERS.add(namesrvController); return namesrvController; } public static BrokerController createAndStartBroker(String nsAddr) { - String baseDir = createBaseDir(); + String baseDir = createTempDir(); + Path commitLogPath = Paths.get(baseDir,"commitlog"); + BrokerConfig brokerConfig = new BrokerConfig(); MessageStoreConfig storeConfig = new MessageStoreConfig(); brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement()); @@ -91,7 +104,7 @@ public class IntegrationTestBase { brokerConfig.setNamesrvAddr(nsAddr); brokerConfig.setEnablePropertyFilter(true); storeConfig.setStorePathRootDir(baseDir); - storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog"); + storeConfig.setStorePathCommitLog(commitLogPath.toString()); storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE); storeConfig.setMaxIndexNum(INDEX_NUM); storeConfig.setMaxHashSlotNum(INDEX_NUM * 4); @@ -149,10 +162,4 @@ public class IntegrationTestBase { return initTopic(topic, nsAddr, clusterName, 8); } - public static void deleteFile(File file) { - if (!file.exists()) { - return; - } - UtilAll.deleteFile(file); - } } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/dledger/DLedgerProduceAndConsumeIT.java b/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/dledger/DLedgerProduceAndConsumeIT.java deleted file mode 100644 index 25f133ac86..0000000000 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/java/base/dledger/DLedgerProduceAndConsumeIT.java +++ /dev/null @@ -1,39 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package base.dledger; - -import static base.IntegrationTestBase.nextPort; - -import base.BaseConf; -import base.IntegrationTestBase; -import org.apache.rocketmq.common.BrokerConfig; -import org.apache.rocketmq.store.config.MessageStoreConfig; - -public class DLedgerProduceAndConsumeIT { - - public BrokerConfig buildBrokerConfig(String cluster, String brokerName) { - BrokerConfig brokerConfig = new BrokerConfig(); - brokerConfig.setBrokerClusterName(cluster); - brokerConfig.setBrokerName(brokerName); - brokerConfig.setBrokerIP1("127.0.0.1"); - brokerConfig.setNamesrvAddr(BaseConf.nsAddr); - return brokerConfig; - } - - public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) { - MessageStoreConfig storeConfig = new MessageStoreConfig(); - String baseDir = IntegrationTestBase.createBaseDir(); - storeConfig.setStorePathRootDir(baseDir); - storeConfig.setStorePathCommitLog(baseDir + "_" + "commitlog"); - storeConfig.setHaListenPort(nextPort()); - storeConfig.setMappedFileSizeCommitLog(10 * 1024 * 1024); - storeConfig.setEnableDLegerCommitLog(true); - storeConfig.setdLegerGroup(brokerName); - storeConfig.setdLegerSelfId(selfId); - storeConfig.setdLegerPeers(peers); - return storeConfig; - } -}