Put RocketMQ's consumer span in current context (#3537)

* Put RocketMQ's consumer span in current context

* codenarc

* code review comments
This commit is contained in:
Mateusz Rzeszutek 2021-07-12 04:07:30 +02:00 committed by GitHub
parent 3e8d3e88bf
commit ce047325b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 108 additions and 36 deletions

View File

@ -4,6 +4,10 @@ plugins {
dependencies {
library("org.apache.rocketmq:rocketmq-client:4.8.0")
testImplementation(project(":instrumentation:rocketmq-client-4.8:testing"))
compileOnly("com.google.auto.value:auto-value-annotations")
annotationProcessor("com.google.auto.value:auto-value")
testLibrary("org.apache.rocketmq:rocketmq-test:4.8.0")
testImplementation(project(":instrumentation:rocketmq-client-4.8:testing"))
}

View File

@ -0,0 +1,26 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import com.google.auto.value.AutoValue;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
@AutoValue
abstract class ContextAndScope {
static ContextAndScope create(Context context, Scope scope) {
return new AutoValue_ContextAndScope(context, scope);
}
abstract Context getContext();
abstract Scope getScope();
void close() {
getScope().close();
}
}

View File

@ -28,7 +28,12 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
return;
}
Context otelContext = tracer.startSpan(Context.current(), context.getMsgList());
context.setMqTraceContext(otelContext);
// it's safe to store the scope in the rocketMq trace context, both before() and after() methods
// are always called from the same thread; see:
// - ConsumeMessageConcurrentlyService$ConsumeRequest#run()
// - ConsumeMessageOrderlyService$ConsumeRequest#run()
context.setMqTraceContext(ContextAndScope.create(otelContext, otelContext.makeCurrent()));
}
@Override
@ -36,9 +41,10 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.end(otelContext);
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
contextAndScope.close();
tracer.end(contextAndScope.getContext());
}
}
}

View File

@ -5,6 +5,11 @@
package io.opentelemetry.instrumentation.rocketmq
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
import base.BaseConf
import io.opentelemetry.instrumentation.test.InstrumentationSpecification
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes
@ -14,19 +19,12 @@ import org.apache.rocketmq.client.producer.SendCallback
import org.apache.rocketmq.client.producer.SendResult
import org.apache.rocketmq.common.message.Message
import org.apache.rocketmq.remoting.common.RemotingHelper
import org.apache.rocketmq.test.listener.rmq.order.RMQOrderListener
import spock.lang.Shared
import spock.lang.Unroll
import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runUnderTrace
@Unroll
abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
private static final int CONSUME_TIMEOUT = 30_000
@Shared
DefaultMQProducer producer
@ -34,10 +32,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
DefaultMQPushConsumer consumer
@Shared
RMQOrderListener messageListener
@Shared
def sharedTopic
String sharedTopic
@Shared
Message msg
@ -58,8 +53,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
msgs.add(msg2)
producer = BaseConf.getProducer(BaseConf.nsAddr)
configureMQProducer(producer)
messageListener = new RMQOrderListener()
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", messageListener)
consumer = BaseConf.getConsumer(BaseConf.nsAddr, sharedTopic, "*", new TracingMessageListener())
configureMQPushConsumer(consumer)
}
@ -69,10 +63,6 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
BaseConf.deleteTempDir()
}
def setup() {
messageListener.clearMsg()
}
def "test rocketmq produce callback"() {
when:
producer.send(msg, new SendCallback() {
@ -84,10 +74,10 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
void onException(Throwable throwable) {
}
})
messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT)
then:
assertTraces(1) {
trace(0, 2) {
trace(0, 3) {
span(0) {
name sharedTopic + " send"
kind PRODUCER
@ -117,6 +107,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
"messaging.rocketmq.queue_offset" Long
}
}
span(2) {
name "messageListener"
kind INTERNAL
childOf span(1)
}
}
}
}
@ -126,14 +121,18 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
runUnderTrace("parent") {
producer.send(msg)
}
messageListener.waitForMessageConsume(1, CONSUME_TIMEOUT)
then:
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "parent")
trace(0, 4) {
span(0) {
name "parent"
kind INTERNAL
}
span(1) {
name sharedTopic + " send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
@ -147,6 +146,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
span(2) {
name sharedTopic + " process"
kind CONSUMER
childOf span(1)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
@ -160,6 +160,11 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
"messaging.rocketmq.queue_offset" Long
}
}
span(3) {
name "messageListener"
kind INTERNAL
childOf span(2)
}
}
}
}
@ -167,22 +172,27 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
def "test rocketmq produce and batch consume"() {
setup:
consumer.setConsumeMessageBatchMaxSize(2)
when:
runUnderTrace("parent") {
producer.send(msgs)
}
messageListener.waitForMessageConsume(msgs.size(), CONSUME_TIMEOUT)
then:
assertTraces(2) {
def itemStepSpan = null
def producerSpan = null
trace(0, 2) {
itemStepSpan = span(1)
producerSpan = span(1)
basicSpan(it, 0, "parent")
span(0) {
name "parent"
kind INTERNAL
}
span(1) {
name sharedTopic + " send"
kind PRODUCER
childOf span(0)
attributes {
"${SemanticAttributes.MESSAGING_SYSTEM.key}" "rocketmq"
"${SemanticAttributes.MESSAGING_DESTINATION.key}" sharedTopic
@ -194,7 +204,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
}
}
trace(1, 3) {
trace(1, 4) {
span(0) {
name "multiple_sources receive"
kind CONSUMER
@ -219,7 +229,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
"messaging.rocketmq.queue_offset" Long
}
childOf span(0)
hasLink itemStepSpan
hasLink producerSpan
}
span(2) {
name sharedTopic + " process"
@ -237,7 +247,12 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
"messaging.rocketmq.queue_offset" Long
}
childOf span(0)
hasLink itemStepSpan
hasLink producerSpan
}
span(3) {
name "messageListener"
kind INTERNAL
childOf span(0)
}
}
}

View File

@ -0,0 +1,21 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq
import static io.opentelemetry.instrumentation.test.utils.TraceUtils.runInternalSpan
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly
import org.apache.rocketmq.common.message.MessageExt
class TracingMessageListener implements MessageListenerOrderly {
@Override
ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
runInternalSpan("messageListener")
return ConsumeOrderlyStatus.SUCCESS
}
}

View File

@ -8,12 +8,12 @@ package base;
import java.util.UUID;
import org.apache.rocketmq.broker.BrokerController;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.MQVersion;
import org.apache.rocketmq.namesrv.NamesrvController;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.test.listener.AbstractListener;
import org.apache.rocketmq.test.util.MQRandomUtils;
import org.apache.rocketmq.test.util.RandomUtil;
@ -45,7 +45,7 @@ public final class BaseConf {
}
public static DefaultMQPushConsumer getConsumer(
String nsAddr, String topic, String subExpression, AbstractListener listener)
String nsAddr, String topic, String subExpression, MessageListener listener)
throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setInstanceName(RandomUtil.getStringByUUID());