Use hooks to register in the iavaagent instrumentation

This commit is contained in:
addname 2021-02-28 18:10:26 +08:00
parent 66617dc9bf
commit 4094f4a925
25 changed files with 434 additions and 762 deletions

View File

@ -5,8 +5,10 @@ muzzle {
group = "org.apache.rocketmq"
module = 'rocketmq-client'
versions = "[4.8.0,)"
assertInverse = true
}
}
dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
implementation project(':instrumentation:rocketmq-client-4.8:library')
@ -14,3 +16,6 @@ dependencies {
}
tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq.client.experimental-span-attributes=true"
}

View File

@ -1,104 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmq;
import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer;
import static io.opentelemetry.instrumentation.rocketmq.TextMapInjectAdapter.SETTER;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.rocketmq.SendCallbackWrapper;
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
public class RocketMqClientApiImplInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.impl.MQClientAPIImpl");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("sendMessage")).and(takesArguments(12)),
RocketMqClientApiImplInstrumentation.class.getName() + "$SendMessageAdvice");
}
public static class SendMessageAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(value = 0, readOnly = false) String addr,
@Advice.Argument(value = 2, readOnly = false) Message msg,
@Advice.Argument(value = 3, readOnly = false) SendMessageRequestHeader requestHeader,
@Advice.Argument(value = 6, readOnly = false) SendCallback sendCallback,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
Context parent = Java8BytecodeBridge.currentContext();
span = tracer().startProducerSpan(addr, msg);
Context newContext = parent.with(span);
try {
Java8BytecodeBridge.getGlobalPropagators()
.getTextMapPropagator()
.inject(newContext, requestHeader, SETTER);
} catch (IllegalStateException e) {
requestHeader = new SendMessageRequestHeader();
requestHeader.getBornTimestamp();
requestHeader.getDefaultTopic();
requestHeader.getDefaultTopicQueueNums();
requestHeader.getFlag();
requestHeader.getProducerGroup();
requestHeader.getMaxReconsumeTimes();
requestHeader.getProperties();
requestHeader.getSysFlag();
requestHeader.getTopic();
requestHeader.getQueueId();
requestHeader.getReconsumeTimes();
Java8BytecodeBridge.getGlobalPropagators()
.getTextMapPropagator()
.inject(newContext, requestHeader, SETTER);
}
scope = newContext.makeCurrent();
if (sendCallback != null) {
sendCallback = new SendCallbackWrapper(sendCallback, span);
}
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Thrown Throwable throwable,
@Advice.Argument(value = 6, readOnly = false) SendCallback sendCallback,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
scope.close();
if (sendCallback == null) {
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
}
}
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmq;
import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMqConcurrentlyConsumeInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(
named("org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("consumeMessage")),
RocketMqConcurrentlyConsumeInstrumentation.class.getName() + "$ConcurrentlyConsumeAdvice");
}
public static class ConcurrentlyConsumeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) List<MessageExt> msgs,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(msgs);
scope = span.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Return ConsumeConcurrentlyStatus status,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
tracer().endConcurrentlySpan(span, status);
scope.close();
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmq;
import io.opentelemetry.instrumentation.rocketmq.TracingConsumeMessageHookImpl;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl;
import java.util.Map;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
public class RocketMqConsumerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.consumer.DefaultMQPushConsumer");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqConsumerInstrumentation.class.getName() + "$AdviceStart");
}
public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(value = "defaultMQPushConsumerImpl",declaringType = DefaultMQPushConsumer.class) DefaultMQPushConsumerImpl defaultMQPushConsumerImpl){
defaultMQPushConsumerImpl.registerConsumeMessageHook(new TracingConsumeMessageHookImpl());
}
}
}

View File

@ -15,14 +15,14 @@ import java.util.List;
@AutoService(InstrumentationModule.class)
public class RocketMqInstrumentationModule extends InstrumentationModule {
public RocketMqInstrumentationModule() {
super("rocketmq", "rockemq-client-4.3");
super("rocketmq-client", "rocketmq-client-4.8");
}
@Override
public List<TypeInstrumentation> typeInstrumentations() {
return asList(
new RocketMqClientApiImplInstrumentation(),
new RocketMqConcurrentlyConsumeInstrumentation(),
new RocketMqOrderlyConsumeInstrumentation());
new RocketMqProducerInstrumentation(),
new RocketMqConsumerInstrumentation()
);
}
}

View File

@ -1,70 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmq;
import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.AgentElementMatchers.implementsInterface;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.nameStartsWith;
import static net.bytebuddy.matcher.ElementMatchers.named;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.List;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMqOrderlyConsumeInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return implementsInterface(
named("org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly"));
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
nameStartsWith("consumeMessage"),
RocketMqOrderlyConsumeInstrumentation.class.getName() + "$OrderlyConsumeAdvice");
}
public static class OrderlyConsumeAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.Argument(0) List<MessageExt> msgs,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
span = tracer().startSpan(msgs);
scope = span.makeCurrent();
}
@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
public static void stopSpan(
@Advice.Return ConsumeOrderlyStatus status,
@Advice.Thrown Throwable throwable,
@Advice.Local("otelSpan") Span span,
@Advice.Local("otelScope") Scope scope) {
if (scope == null) {
return;
}
tracer().endOrderlySpan(span, status);
scope.close();
if (throwable == null) {
tracer().end(span);
} else {
tracer().endExceptionally(span, throwable);
}
}
}
}

View File

@ -0,0 +1,50 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.javaagent.instrumentation.rocketmq;
import static io.opentelemetry.javaagent.tooling.bytebuddy.matcher.ClassLoaderMatcher.hasClassesNamed;
import static java.util.Collections.singletonMap;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import io.opentelemetry.instrumentation.rocketmq.TracingSendMessageHookImpl;
import io.opentelemetry.javaagent.tooling.TypeInstrumentation;
import java.util.Map;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.method.MethodDescription;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;
import org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
public class RocketMqProducerInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<ClassLoader> classLoaderOptimization() {
return hasClassesNamed("org.apache.rocketmq.client.producer.DefaultMQProducer");
}
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("org.apache.rocketmq.client.producer.DefaultMQProducer");
}
@Override
public Map<? extends ElementMatcher<? super MethodDescription>, String> transformers() {
return singletonMap(
isMethod().and(named("start")).and(takesArguments(0)),
RocketMqProducerInstrumentation.class.getName() + "$AdviceStart");
}
public static class AdviceStart {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.FieldValue(value = "defaultMQProducerImpl",declaringType = DefaultMQProducer.class) DefaultMQProducerImpl defaultMQProducerImpl){
defaultMQProducerImpl.registerSendMessageHook(new TracingSendMessageHookImpl());
}
}
}

View File

@ -10,5 +10,4 @@ import io.opentelemetry.instrumentation.test.AgentTestTrait
class RocketMqClientTest extends AbstractRocketMqClientTest implements AgentTestTrait {
}

View File

@ -1,8 +1,11 @@
apply from: "$rootDir/gradle/instrumentation-library.gradle"
dependencies {
library group: 'org.apache.rocketmq', name: 'rocketmq-client', version: '4.8.0'
testImplementation project(':instrumentation:rocketmq-client-4.8:testing')
}
tasks.withType(Test) {
jvmArgs "-Dotel.instrumentation.rocketmq.client.experimental-span-attributes=true"
}

View File

@ -14,5 +14,9 @@ public final class RocketMqClientConfig {
.getBooleanProperty("otel.instrumentation.rocketmq.client-propagation", true);
}
public static final boolean CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES =
Config.get()
.getBooleanProperty("otel.instrumentation.rocketmq.client.experimental-span-attributes", false);
private RocketMqClientConfig() {}
}

View File

@ -9,12 +9,11 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
public class RocketMqConsumerTracer extends BaseTracer {
@ -30,22 +29,46 @@ public class RocketMqConsumerTracer extends BaseTracer {
return "io.opentelemetry.javaagent.rocketmq-client";
}
public Span startSpan(List<MessageExt> msgs) {
public Context startSpan(Context parentContext, List<MessageExt> msgs) {
MessageExt msg = msgs.get(0);
Span span =
tracer
.spanBuilder(spanNameOnConsume(msg))
.setSpanKind(CONSUMER)
.setParent(extractParent(msg))
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, getStoreSize(msgs))
.startSpan();
onConsume(span, msg);
return span;
if (msgs.size() == 1) {
SpanBuilder spanBuilder = startSpanBuilder(msg)
.setParent(extractParent(msg));
return withClientSpan(parentContext, spanBuilder.startSpan());
} else {
SpanBuilder spanBuilder =
tracer
.spanBuilder(msg.getTopic() + " receive")
.setSpanKind(CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive");
Context rootContext = withClientSpan(parentContext, spanBuilder.startSpan());
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
return rootContext;
}
}
public void createChildSpan(Context parentContext, MessageExt msg) {
SpanBuilder childSpanBuilder = startSpanBuilder(msg)
.setParent(parentContext)
.addLink(Span.fromContext(extractParent(msg)).getSpanContext());
end(withClientSpan(parentContext, childSpanBuilder.startSpan()));
}
public SpanBuilder startSpanBuilder(MessageExt msg) {
SpanBuilder spanBuilder = tracer.spanBuilder(spanNameOnConsume(msg))
.setSpanKind(CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId())
.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) msg.getBody().length);
onConsume(spanBuilder, msg);
return spanBuilder;
}
private Context extractParent(MessageExt msg) {
@ -56,16 +79,13 @@ public class RocketMqConsumerTracer extends BaseTracer {
}
}
void onConsume(Span span, MessageExt msg) {
span.setAttribute("messaging.rocketmq.tags", msg.getTags());
span.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId());
span.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset());
span.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg));
}
long getStoreSize(List<MessageExt> msgs) {
long storeSize = msgs.stream().mapToInt(item -> item.getStoreSize()).sum();
return storeSize;
void onConsume(SpanBuilder spanBuilder, MessageExt msg) {
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId());
spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg));
}
}
String spanNameOnConsume(MessageExt msg) {
@ -79,12 +99,4 @@ public class RocketMqConsumerTracer extends BaseTracer {
return null;
}
}
public void endConcurrentlySpan(Span span, ConsumeConcurrentlyStatus status) {
span.setAttribute("messaging.rocketmq.consume_concurrently_status", status.name());
}
public void endOrderlySpan(Span span, ConsumeOrderlyStatus status) {
span.setAttribute("messaging.rocketmq.consume_orderly_status", status.name());
}
}

View File

@ -9,6 +9,7 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.rocketmq.client.producer.SendResult;
@ -27,25 +28,38 @@ public class RocketMqProducerTracer extends BaseTracer {
return "io.opentelemetry.javaagent.rocketmq-client";
}
public Span startProducerSpan(String addr, Message msg) {
SpanBuilder span = spanBuilder(spanNameOnProduce(msg), PRODUCER);
onProduce(span, msg, addr);
return span.startSpan();
public Context startProducerSpan(String addr, Message msg, Context parentContext) {
SpanBuilder spanBuilder = spanBuilder(spanNameOnProduce(msg), PRODUCER);
onProduce(spanBuilder, msg, addr);
return withClientSpan(parentContext, spanBuilder.startSpan());
}
public void onCallback(Span span, SendResult sendResult) {
span.setAttribute("messaging.rocketmq.callback_result", sendResult.getSendStatus().name());
public void onCallback(Context context, SendResult sendResult) {
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
Span span = Span.fromContext(context);
span.setAttribute("messaging.rocketmq.callback_result", sendResult.getSendStatus().name());
}
}
public void onProduce(SpanBuilder span, Message msg, String addr) {
span.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
span.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic());
span.setAttribute("messaging.rocketmq.tags", msg.getTags());
span.setAttribute("messaging.rocketmq.broker_address", addr);
private void onProduce(SpanBuilder spanBuilder, Message msg, String addr) {
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq");
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic());
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", addr);
}
}
public String spanNameOnProduce(Message msg) {
public void afterProduce(Context context, SendResult sendResult) {
Span span = Span.fromContext(context);
span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, sendResult.getMsgId());
if (RocketMqClientConfig.CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {
span.setAttribute("messaging.rocketmq.send_result", sendResult.getSendStatus().name());
}
}
private String spanNameOnProduce(Message msg) {
return msg.getTopic() + " send";
}
}

View File

@ -1,35 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer;
import io.opentelemetry.api.trace.Span;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
public class SendCallbackWrapper implements SendCallback {
private final SendCallback sendCallback;
private final Span span;
public SendCallbackWrapper(SendCallback sendCallback, Span span) {
this.sendCallback = sendCallback;
this.span = span;
}
@Override
public void onSuccess(SendResult sendResult) {
tracer().onCallback(span, sendResult);
tracer().end(span);
sendCallback.onSuccess(sendResult);
}
@Override
public void onException(Throwable e) {
tracer().endExceptionally(span, e);
sendCallback.onException(e);
}
}

View File

@ -5,10 +5,10 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.propagation.TextMapPropagator;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.Map;
public class TextMapExtractAdapter implements TextMapPropagator.Getter<Map<String, String>> {
public class TextMapExtractAdapter implements TextMapGetter<Map<String, String>> {
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
@ -19,7 +19,6 @@ public class TextMapExtractAdapter implements TextMapPropagator.Getter<Map<Strin
@Override
public String get(Map<String, String> carrier, String key) {
String obj = carrier.get(key);
return obj;
return carrier.get(key);
}
}

View File

@ -5,20 +5,15 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.propagation.TextMapPropagator;
import org.apache.rocketmq.common.protocol.header.SendMessageRequestHeader;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.util.Map;
public class TextMapInjectAdapter implements TextMapPropagator.Setter<SendMessageRequestHeader> {
public class TextMapInjectAdapter implements TextMapSetter<Map<String, String>> {
public static final TextMapInjectAdapter SETTER = new TextMapInjectAdapter();
@Override
public void set(SendMessageRequestHeader header, String key, String value) {
StringBuilder properties = new StringBuilder(header.getProperties());
properties.append(key);
properties.append('\u0001');
properties.append(value);
properties.append('\u0002');
header.setProperties(properties.toString());
public void set(Map<String, String> carrier, String key, String value) {
carrier.put(key,value);
}
}

View File

@ -0,0 +1,29 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.Context;
import org.apache.rocketmq.client.hook.ConsumeMessageContext;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import static io.opentelemetry.instrumentation.rocketmq.RocketMqConsumerTracer.tracer;
public class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
@Override
public String hookName() {
return "OpenTelemetryConsumeMessageTraceHook";
}
@Override
public void consumeMessageBefore(ConsumeMessageContext context) {
}
@Override
public void consumeMessageAfter(ConsumeMessageContext context) {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
Context traceContext =tracer().startSpan(Context.current(),context.getMsgList());
tracer().end(traceContext);
}
}

View File

@ -1,34 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.api.trace.Span;
import java.util.List;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
public class TracingMessageInterceptor {
public void producerIntercept(String addr, Message msg) {
Span span = RocketMqProducerTracer.tracer().startProducerSpan(addr, msg);
RocketMqProducerTracer.tracer().end(span);
}
public void consumerConcurrentlyIntercept(List<MessageExt> msgs) {
Span span = RocketMqConsumerTracer.tracer().startSpan(msgs);
RocketMqConsumerTracer.tracer()
.endConcurrentlySpan(span, ConsumeConcurrentlyStatus.CONSUME_SUCCESS);
RocketMqConsumerTracer.tracer().tracer().end(span);
}
public void consumerOrderlyIntercept(List<MessageExt> msgs) {
Span span = RocketMqConsumerTracer.tracer().startSpan(msgs);
RocketMqConsumerTracer.tracer().endOrderlySpan(span, ConsumeOrderlyStatus.SUCCESS);
RocketMqConsumerTracer.tracer().tracer().end(span);
}
}

View File

@ -0,0 +1,40 @@
package io.opentelemetry.instrumentation.rocketmq;
import static io.opentelemetry.instrumentation.rocketmq.RocketMqProducerTracer.tracer;
import static io.opentelemetry.instrumentation.rocketmq.TextMapInjectAdapter.SETTER;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.context.Context;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
public class TracingSendMessageHookImpl implements SendMessageHook {
@Override
public String hookName() {
return "OpenTelemetrySendMessageTraceHook";
}
@Override
public void sendMessageBefore(SendMessageContext context) {
if (context == null) {
return;
}
Context traceContext = tracer().startProducerSpan(context.getBrokerAddr(), context.getMessage(), Context.current());
if (RocketMqClientConfig.isPropagationEnabled()){
GlobalOpenTelemetry.getPropagators()
.getTextMapPropagator()
.inject(traceContext, context.getMessage().getProperties(), SETTER);
}
context.setMqTraceContext(traceContext);
}
@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) {
return;
}
tracer().afterProduce((Context)context.getMqTraceContext(), context.getSendResult());
tracer().end((Context)context.getMqTraceContext());
}
}

View File

@ -4,25 +4,90 @@
*/
package io.opentelemetry.instrumentation.rocketmq
import io.opentelemetery.instrumentation.rocketmq.AbstractRocketMqClientLibraryTest
import base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.instrumentation.test.LibraryTestTrait
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener
import spock.lang.Shared
class RocketMqClientTest extends AbstractRocketMqClientLibraryTest implements LibraryTestTrait {
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
@Override
void producerIntercept(String addr,Message msg) {
TracingMessageInterceptor tracingProducerInterceptor= new TracingMessageInterceptor()
tracingProducerInterceptor.producerIntercept(addr,msg)
class RocketMqClientTest extends InstrumentationSpecification implements LibraryTestTrait {
@Shared
DefaultMQPushConsumer consumer
@Shared
DefaultMQProducer producer
@Shared
String sharedTopic
@Shared
String brokerAddr
@Shared
Message msg
def setup() {
sharedTopic = BaseConf.initTopic()
brokerAddr = BaseConf.getBrokerAddr()
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
}
@Override
void consumerIntercept(List<Object> msg, String type) {
TracingMessageInterceptor tracingProducerInterceptor= new TracingMessageInterceptor()
if("concurrent".equals(type)){
tracingProducerInterceptor.consumerConcurrentlyIntercept(msg)
} else {
tracingProducerInterceptor.consumerOrderlyIntercept(msg)
def "test rocketmq produce and consume"() {
setup:
producer = BaseConf.getRMQProducer(BaseConf.nsAddr)
producer.getDefaultMQProducerImpl().registerSendMessageHook(new TracingSendMessageHookImpl())
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
consumer.getDefaultMQPushConsumerImpl().registerConsumeMessageHook(new TracingConsumeMessageHookImpl())
when:
runUnderTrace("parent") {
producer.send(msg)
}
then:
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "parent")
span(1) {
name sharedTopic + " send"
kind PRODUCER
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
}
}
span(2) {
name sharedTopic + " process"
kind CONSUMER
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
}
}
}
cleanup:
producer.shutdown()
consumer.shutdown()
BaseConf.deleteTempDir()
}
}
}

View File

@ -3,8 +3,6 @@ apply from: "$rootDir/gradle/java.gradle"
dependencies {
api project(':testing-common')
api group: 'org.apache.rocketmq', name: 'rocketmq-test', version: '4.8.0'
api group: 'io.openmessaging', name: 'openmessaging-api', version: '0.3.1-alpha'
api group: 'org.apache.rocketmq', name: 'rocketmq-openmessaging', version: '4.8.0'
implementation deps.guava
implementation deps.groovy

View File

@ -1,161 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetery.instrumentation.rocketmq
import base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener
import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener
import spock.lang.Shared
import spock.lang.Unroll
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
@Unroll
abstract class AbstractRocketMqClientLibraryTest extends InstrumentationSpecification{
@Shared
RMQNormalConsumer consumer
@Shared
RMQNormalProducer producer
@Shared
String sharedTopic
@Shared
String brokerAddr
@Shared
Message msg
@Shared
int consumeTime = 5000
@Shared
def baseConf =new BaseConf()
abstract void producerIntercept(String addr,Message msg)
abstract void consumerIntercept(List<Object> msgs,String type)
def setup() {
sharedTopic =baseConf.initTopic()
brokerAddr =baseConf.getBrokerAddr()
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
}
def "test rocketmq produce"() {
setup:
producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic)
when:
runUnderTrace("parent") {
producerIntercept(brokerAddr,msg)
producer.send(msg)
}
then:
assertTraces(1) {
trace(0, 2) {
basicSpan(it, 0, "parent")
span(1) {
name sharedTopic + " send"
kind PRODUCER
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
}
}
}
cleanup:
producer.shutdown()
}
}
def "test rocketmq concurrently consume"() {
setup:
producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic)
consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQNormalListener())
when:
producer.send(msg)
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime)
consumerIntercept(consumer.getListener().getAllOriginMsg(),"concurrent")
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
name sharedTopic + " process"
kind CONSUMER
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.consume_concurrently_status" "CONSUME_SUCCESS"
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
}
}
cleanup:
producer.shutdown()
consumer.shutdown()
}
}
def "test rocketmq orderly consume"() {
setup:
producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic)
consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
when:
producer.send(msg)
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime)
consumerIntercept(consumer.getListener().getAllOriginMsg(),"order")
then:
assertTraces(1) {
trace(0, 1) {
span(0) {
name sharedTopic + " process"
kind CONSUMER
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.consume_orderly_status" "SUCCESS"
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
}
}
cleanup:
producer.shutdown()
consumer.shutdown()
}
}
}

View File

@ -8,14 +8,12 @@ package io.opentelemetery.instrumentation.rocketmq
import base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer
import org.apache.rocketmq.client.producer.DefaultMQProducer
import org.apache.rocketmq.client.producer.SendCallback
import org.apache.rocketmq.client.producer.SendResult
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer
import org.apache.rocketmq.test.factory.ProducerFactory
import org.apache.rocketmq.test.listener.rmq.concurrent.RMQNormalListener
import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener
import spock.lang.Shared
@ -29,41 +27,32 @@ import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTra
abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
@Shared
private RMQNormalConsumer consumer
DefaultMQPushConsumer consumer
@Shared
private RMQNormalProducer producer
DefaultMQProducer producer
@Shared
DefaultMQProducer defaultMQProducer
String sharedTopic
@Shared
private String sharedTopic
@Shared
private String brokerAddr
String brokerAddr
@Shared
Message msg
@Shared
int consumeTime = 1000
@Shared
BaseConf baseConf =new BaseConf()
def setup() {
sharedTopic =baseConf.initTopic()
brokerAddr =baseConf.getBrokerAddr()
sharedTopic = BaseConf.initTopic()
brokerAddr = BaseConf.getBrokerAddr()
msg = new Message(sharedTopic, "TagA", ("Hello RocketMQ").getBytes(RemotingHelper.DEFAULT_CHARSET))
}
def "test rocketmq produce callback1"() {
def "test rocketmq produce callback"() {
setup:
defaultMQProducer = ProducerFactory.getRMQProducer(baseConf.nsAddr)
producer = BaseConf.getRMQProducer(BaseConf.nsAddr)
when:
runUnderTrace("parent") {
defaultMQProducer.send(msg, new SendCallback() {
producer.send(msg, new SendCallback() {
@Override
void onSuccess(SendResult sendResult) {
}
@ -85,23 +74,26 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.callback_result" "SEND_OK"
"messaging.rocketmq.send_result" "SEND_OK"
}
}
}
cleanup:
BaseConf.deleteTempDir()
}
}
def "test rocketmq produce and concurrently consume"() {
setup:
producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic)
consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQNormalListener())
producer = BaseConf.getRMQProducer(BaseConf.nsAddr)
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQNormalListener())
when:
runUnderTrace("parent") {
producer.send(msg)
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime)
}
then:
assertTraces(1) {
@ -114,8 +106,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.send_result" "SEND_OK"
}
}
span(2) {
@ -127,9 +121,9 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.consume_concurrently_status" "CONSUME_SUCCESS"
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
@ -139,19 +133,18 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
cleanup:
producer.shutdown()
consumer.shutdown()
BaseConf.deleteTempDir()
}
}
def "test rocketmq produce and orderly consume"() {
setup:
producer = baseConf.getProducer(baseConf.nsAddr, sharedTopic)
consumer = baseConf.getConsumer(baseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
producer = BaseConf.getRMQProducer(BaseConf.nsAddr)
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new RMQOrderListener())
when:
runUnderTrace("parent") {
producer.send(msg)
consumer.getListener().waitForMessageConsume(producer.getAllMsgBody(), consumeTime)
}
then:
assertTraces(1) {
@ -164,8 +157,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.send_result" "SEND_OK"
}
}
span(2) {
@ -177,19 +172,18 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification{
"${SemanticAttributes.MESSAGING_DESTINATION_KIND.key}" "topic"
"${SemanticAttributes.MESSAGING_OPERATION.key}" "process"
"${SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES.key}" Long
"${SemanticAttributes.MESSAGING_MESSAGE_ID.key}" String
"messaging.rocketmq.tags" "TagA"
"messaging.rocketmq.broker_address" brokerAddr
"messaging.rocketmq.consume_orderly_status" "SUCCESS"
"messaging.rocketmq.queue_id" Long
"messaging.rocketmq.queue_offset" Long
}
}
}
cleanup:
producer.shutdown()
consumer.shutdown()
BaseConf.deleteTempDir()
}
}
}

View File

@ -7,24 +7,21 @@ package base;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import org.apache.log4j.Logger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.client.rmq.RMQAsyncSendProducer;
import org.apache.rocketmq.test.client.rmq.RMQNormalConsumer;
import org.apache.rocketmq.test.client.rmq.RMQNormalProducer;
import org.apache.rocketmq.test.client.rmq.RMQTransactionalProducer;
import org.apache.rocketmq.test.clientinterface.AbstractMQConsumer;
import org.apache.rocketmq.test.clientinterface.AbstractMQProducer;
import org.apache.rocketmq.test.factory.ConsumerFactory;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQAdmin;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.RandomUtil;
public class BaseConf {
final class BaseConf {
public static final String nsAddr;
public static final String broker1Addr;
protected static String broker1Name;
@ -48,12 +45,13 @@ public class BaseConf {
brokerNum = 2;
}
public BaseConf() {}
private BaseConf() {}
public static String initTopic() {
String topic = MQRandomUtils.getRandomTopic();
IntegrationTestBase.initTopic(topic, nsAddr, clusterName);
if(!IntegrationTestBase.initTopic(topic, nsAddr, clusterName)){
log.error("Topic init failed");
}
return topic;
}
@ -71,105 +69,28 @@ public class BaseConf {
return group;
}
public static RMQNormalProducer getProducer(String nsAddr, String topic) {
return getProducer(nsAddr, topic, false);
}
public static RMQNormalProducer getProducer(String nsAddr, String topic, boolean useTls) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, useTls);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQNormalProducer getProducer(
String nsAddr, String topic, String producerGoup, String instanceName) {
RMQNormalProducer producer = new RMQNormalProducer(nsAddr, topic, producerGoup, instanceName);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQTransactionalProducer getTransactionalProducer(
String nsAddr, String topic, TransactionListener transactionListener) {
RMQTransactionalProducer producer =
new RMQTransactionalProducer(nsAddr, topic, false, transactionListener);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQAsyncSendProducer getAsyncProducer(String nsAddr, String topic) {
RMQAsyncSendProducer producer = new RMQAsyncSendProducer(nsAddr, topic);
if (debug) {
producer.setDebug();
}
mqClients.add(producer);
return producer;
}
public static RMQNormalConsumer getConsumer(
String nsAddr, String topic, String subExpression, AbstractListener listener) {
return getConsumer(nsAddr, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(
String nsAddr,
String topic,
String subExpression,
AbstractListener listener,
boolean useTls) {
public static DefaultMQPushConsumer getConsumer(String nsAddr, String topic, String subExpression,
AbstractListener listener)
throws MQClientException {
String consumerGroup = initConsumerGroup();
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, useTls);
}
public static RMQNormalConsumer getConsumer(
String nsAddr,
String consumerGroup,
String topic,
String subExpression,
AbstractListener listener) {
return getConsumer(nsAddr, consumerGroup, topic, subExpression, listener, false);
}
public static RMQNormalConsumer getConsumer(
String nsAddr,
String consumerGroup,
String topic,
String subExpression,
AbstractListener listener,
boolean useTls) {
RMQNormalConsumer consumer =
ConsumerFactory.getRMQNormalConsumer(
nsAddr, consumerGroup, topic, subExpression, listener, useTls);
if (debug) {
consumer.setDebug();
}
mqClients.add(consumer);
log.info(
String.format(
"consumer[%s] start,topic[%s],subExpression[%s]", consumerGroup, topic, subExpression));
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);
consumer.setInstanceName(RandomUtil.getStringByUUID());
consumer.setNamesrvAddr(nsAddr);
consumer.subscribe(topic, subExpression);
consumer.setMessageListener(listener);
consumer.start();
return consumer;
}
public static void shutdown() {
try {
for (Object mqClient : mqClients) {
if (mqClient instanceof AbstractMQProducer) {
((AbstractMQProducer) mqClient).shutdown();
public static DefaultMQProducer getRMQProducer(String ns) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer(RandomUtil.getStringByUUID());
producer.setInstanceName(UUID.randomUUID().toString());
producer.setNamesrvAddr(ns);
producer.start();
return producer;
}
} else {
((AbstractMQConsumer) mqClient).shutdown();
}
}
} catch (Exception e) {
e.printStackTrace();
}
private static void deleteTempDir() {
IntegrationTestBase.deleteTempDir();
}
}

View File

@ -6,14 +6,16 @@
package base;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.namesrv.NamesrvConfig;
import org.apache.rocketmq.logging.InternalLogger;
import org.apache.rocketmq.logging.InternalLoggerFactory;
@ -29,7 +31,6 @@ public class IntegrationTestBase {
public static final InternalLogger logger =
InternalLoggerFactory.getLogger(IntegrationTestBase.class);
protected static final String SEP = File.separator;
protected static final String BROKER_NAME_PREFIX = "TestBrokerName_";
protected static final AtomicInteger BROKER_INDEX = new AtomicInteger(0);
protected static final List<File> TMPE_FILES = new ArrayList<>();
@ -46,26 +47,37 @@ public class IntegrationTestBase {
protected static final Random random = new Random();
public static String createBaseDir() {
String baseDir = System.getProperty("user.home") + SEP + "unitteststore-" + UUID.randomUUID();
final File file = new File(baseDir);
if (file.exists()) {
logger.info(
String.format(
"[%s] has already existed, please back up and remove it for integration tests",
baseDir));
System.exit(1);
private static String createTempDir() {
String path = null;
try {
File file = Files.createTempDirectory("opentelemetry-rocketmq-client-temp").toFile();
TMPE_FILES.add(file);
path= file.getCanonicalPath();
} catch (IOException e) {
e.printStackTrace();
}
return path;
}
public static void deleteTempDir() {
for(File file:TMPE_FILES){
boolean deleted = file.delete();
if (!deleted) {
file.deleteOnExit();
}
}
TMPE_FILES.add(file);
return baseDir;
}
public static NamesrvController createAndStartNamesrv() {
String baseDir = createBaseDir();
String baseDir = createTempDir();
Path kvConfigPath = Paths.get(baseDir,"namesrv","kvConfig.json");
Path namesrvPath = Paths.get(baseDir,"namesrv","namesrv.properties");
NamesrvConfig namesrvConfig = new NamesrvConfig();
NettyServerConfig nameServerNettyServerConfig = new NettyServerConfig();
namesrvConfig.setKvConfigPath(baseDir + SEP + "namesrv" + SEP + "kvConfig.json");
namesrvConfig.setConfigStorePath(baseDir + SEP + "namesrv" + SEP + "namesrv.properties");
namesrvConfig.setKvConfigPath(kvConfigPath.toString());
namesrvConfig.setConfigStorePath(namesrvPath.toString());
nameServerNettyServerConfig.setListenPort(nextPort());
NamesrvController namesrvController =
@ -76,14 +88,15 @@ public class IntegrationTestBase {
namesrvController.start();
} catch (Exception e) {
logger.info("Name Server start failed", e);
System.exit(1);
}
NAMESRV_CONTROLLERS.add(namesrvController);
return namesrvController;
}
public static BrokerController createAndStartBroker(String nsAddr) {
String baseDir = createBaseDir();
String baseDir = createTempDir();
Path commitLogPath = Paths.get(baseDir,"commitlog");
BrokerConfig brokerConfig = new BrokerConfig();
MessageStoreConfig storeConfig = new MessageStoreConfig();
brokerConfig.setBrokerName(BROKER_NAME_PREFIX + BROKER_INDEX.getAndIncrement());
@ -91,7 +104,7 @@ public class IntegrationTestBase {
brokerConfig.setNamesrvAddr(nsAddr);
brokerConfig.setEnablePropertyFilter(true);
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + SEP + "commitlog");
storeConfig.setStorePathCommitLog(commitLogPath.toString());
storeConfig.setMappedFileSizeCommitLog(COMMIT_LOG_SIZE);
storeConfig.setMaxIndexNum(INDEX_NUM);
storeConfig.setMaxHashSlotNum(INDEX_NUM * 4);
@ -149,10 +162,4 @@ public class IntegrationTestBase {
return initTopic(topic, nsAddr, clusterName, 8);
}
public static void deleteFile(File file) {
if (!file.exists()) {
return;
}
UtilAll.deleteFile(file);
}
}

View File

@ -1,39 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package base.dledger;
import static base.IntegrationTestBase.nextPort;
import base.BaseConf;
import base.IntegrationTestBase;
import org.apache.rocketmq.common.BrokerConfig;
import org.apache.rocketmq.store.config.MessageStoreConfig;
public class DLedgerProduceAndConsumeIT {
public BrokerConfig buildBrokerConfig(String cluster, String brokerName) {
BrokerConfig brokerConfig = new BrokerConfig();
brokerConfig.setBrokerClusterName(cluster);
brokerConfig.setBrokerName(brokerName);
brokerConfig.setBrokerIP1("127.0.0.1");
brokerConfig.setNamesrvAddr(BaseConf.nsAddr);
return brokerConfig;
}
public MessageStoreConfig buildStoreConfig(String brokerName, String peers, String selfId) {
MessageStoreConfig storeConfig = new MessageStoreConfig();
String baseDir = IntegrationTestBase.createBaseDir();
storeConfig.setStorePathRootDir(baseDir);
storeConfig.setStorePathCommitLog(baseDir + "_" + "commitlog");
storeConfig.setHaListenPort(nextPort());
storeConfig.setMappedFileSizeCommitLog(10 * 1024 * 1024);
storeConfig.setEnableDLegerCommitLog(true);
storeConfig.setdLegerGroup(brokerName);
storeConfig.setdLegerSelfId(selfId);
storeConfig.setdLegerPeers(peers);
return storeConfig;
}
}