Convert RocketMQ to Instrumenter (#4457)

* Convert RocketMqProducerTracer to Instrumenter

* Convert RocketMqConsumerTracer to Instrumenter

* Use semantic constants

* Renames

* Add shouldStart checks

* Replace unused parameter with Void
This commit is contained in:
Nikita Salnikov-Tarnovski 2021-10-26 16:48:39 +03:00 committed by GitHub
parent 00643724d1
commit 6bbc10a4aa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 475 additions and 208 deletions

View File

@ -6,13 +6,16 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.propagation.TextMapSetter;
import java.util.Map;
import org.apache.rocketmq.client.hook.SendMessageContext;
enum MapSetter implements TextMapSetter<Map<String, String>> {
enum MapSetter implements TextMapSetter<SendMessageContext> {
INSTANCE;
@Override
public void set(Map<String, String> carrier, String key, String value) {
carrier.put(key, value);
public void set(SendMessageContext carrier, String key, String value) {
if (carrier == null) {
return;
}
carrier.getMessage().getProperties().put(key, value);
}
}

View File

@ -0,0 +1,80 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.message.MessageExt;
class RockerMqConsumerAttributeExtractor extends MessagingAttributesExtractor<MessageExt, Void> {
@Override
public MessageOperation operation() {
return MessageOperation.PROCESS;
}
@Override
protected String system(MessageExt request) {
return "rocketmq";
}
@Override
protected String destinationKind(MessageExt request) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Override
protected String destination(MessageExt request) {
return request.getTopic();
}
@Override
protected boolean temporaryDestination(MessageExt request) {
return false;
}
@Nullable
@Override
protected String protocol(MessageExt request) {
return null;
}
@Nullable
@Override
protected String protocolVersion(MessageExt request) {
return null;
}
@Nullable
@Override
protected String url(MessageExt request) {
return null;
}
@Nullable
@Override
protected String conversationId(MessageExt request) {
return null;
}
@Override
protected Long messagePayloadSize(MessageExt request) {
return (long) request.getBody().length;
}
@Nullable
@Override
protected Long messagePayloadCompressedSize(MessageExt request) {
return null;
}
@Nullable
@Override
protected String messageId(MessageExt request, @Nullable Void unused) {
return request.getMsgId();
}
}

View File

@ -0,0 +1,48 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.message.MessageExt;
class RockerMqConsumerExperimentalAttributeExtractor
implements AttributesExtractor<MessageExt, Void> {
private static final AttributeKey<String> MESSAGING_ROCKETMQ_TAGS =
AttributeKey.stringKey("messaging.rocketmq.tags");
private static final AttributeKey<Long> MESSAGING_ROCKETMQ_QUEUE_ID =
AttributeKey.longKey("messaging.rocketmq.queue_id");
private static final AttributeKey<Long> MESSAGING_ROCKETMQ_QUEUE_OFFSET =
AttributeKey.longKey("messaging.rocketmq.queue_offset");
private static final AttributeKey<String> MESSAGING_ROCKETMQ_BROKER_ADDRESS =
AttributeKey.stringKey("messaging.rocketmq.broker_address");
@Override
public void onStart(AttributesBuilder attributes, MessageExt msg) {
set(attributes, MESSAGING_ROCKETMQ_TAGS, msg.getTags());
set(attributes, MESSAGING_ROCKETMQ_QUEUE_ID, (long) msg.getQueueId());
set(attributes, MESSAGING_ROCKETMQ_QUEUE_OFFSET, msg.getQueueOffset());
set(attributes, MESSAGING_ROCKETMQ_BROKER_ADDRESS, getBrokerHost(msg));
}
@Nullable
private static String getBrokerHost(MessageExt msg) {
if (msg.getStoreHost() != null) {
return msg.getStoreHost().toString().replace("/", "");
} else {
return null;
}
}
@Override
public void onEnd(
AttributesBuilder attributes,
MessageExt consumeMessageContext,
@Nullable Void unused,
@Nullable Throwable error) {}
}

View File

@ -0,0 +1,82 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.hook.SendMessageContext;
class RockerMqProducerAttributeExtractor
extends MessagingAttributesExtractor<SendMessageContext, Void> {
@Override
public MessageOperation operation() {
return MessageOperation.SEND;
}
@Override
protected String system(SendMessageContext sendMessageContext) {
return "rocketmq";
}
@Override
protected String destinationKind(SendMessageContext sendMessageContext) {
return SemanticAttributes.MessagingDestinationKindValues.TOPIC;
}
@Override
protected String destination(SendMessageContext sendMessageContext) {
return sendMessageContext.getMessage().getTopic();
}
@Override
protected boolean temporaryDestination(SendMessageContext sendMessageContext) {
return false;
}
@Nullable
@Override
protected String protocol(SendMessageContext sendMessageContext) {
return null;
}
@Nullable
@Override
protected String protocolVersion(SendMessageContext sendMessageContext) {
return null;
}
@Nullable
@Override
protected String url(SendMessageContext sendMessageContext) {
return null;
}
@Nullable
@Override
protected String conversationId(SendMessageContext sendMessageContext) {
return null;
}
@Nullable
@Override
protected Long messagePayloadSize(SendMessageContext sendMessageContext) {
return null;
}
@Nullable
@Override
protected Long messagePayloadCompressedSize(SendMessageContext sendMessageContext) {
return null;
}
@Nullable
@Override
protected String messageId(SendMessageContext request, @Nullable Void unused) {
return request.getSendResult().getMsgId();
}
}

View File

@ -0,0 +1,42 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.api.common.AttributeKey;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor;
import javax.annotation.Nullable;
import org.apache.rocketmq.client.hook.SendMessageContext;
class RockerMqProducerExperimentalAttributeExtractor
implements AttributesExtractor<SendMessageContext, Void> {
private static final AttributeKey<String> MESSAGING_ROCKETMQ_TAGS =
AttributeKey.stringKey("messaging.rocketmq.tags");
private static final AttributeKey<String> MESSAGING_ROCKETMQ_BROKER_ADDRESS =
AttributeKey.stringKey("messaging.rocketmq.broker_address");
private static final AttributeKey<String> MESSAGING_ROCKETMQ_SEND_RESULT =
AttributeKey.stringKey("messaging.rocketmq.send_result");
@Override
public void onStart(AttributesBuilder attributes, SendMessageContext request) {
set(attributes, MESSAGING_ROCKETMQ_TAGS, request.getMessage().getTags());
set(attributes, MESSAGING_ROCKETMQ_BROKER_ADDRESS, request.getBrokerAddr());
}
@Override
public void onEnd(
AttributesBuilder attributes,
SendMessageContext request,
@Nullable Void unused,
@Nullable Throwable error) {
if (request.getSendResult() != null) {
set(
attributes,
MESSAGING_ROCKETMQ_SEND_RESULT,
request.getSendResult().getSendStatus().name());
}
}
}

View File

@ -0,0 +1,59 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import java.util.List;
import org.apache.rocketmq.common.message.MessageExt;
final class RocketMqConsumerInstrumenter {
private final Instrumenter<MessageExt, Void> singleProcessInstrumenter;
private final Instrumenter<MessageExt, Void> batchProcessInstrumenter;
private final Instrumenter<Void, Void> batchReceiveInstrumenter;
RocketMqConsumerInstrumenter(
Instrumenter<MessageExt, Void> singleProcessInstrumenter,
Instrumenter<MessageExt, Void> batchProcessInstrumenter,
Instrumenter<Void, Void> batchReceiveInstrumenter) {
this.singleProcessInstrumenter = singleProcessInstrumenter;
this.batchProcessInstrumenter = batchProcessInstrumenter;
this.batchReceiveInstrumenter = batchReceiveInstrumenter;
}
Context start(Context parentContext, List<MessageExt> msgs) {
if (msgs.size() == 1) {
if (singleProcessInstrumenter.shouldStart(parentContext, msgs.get(0))) {
return singleProcessInstrumenter.start(parentContext, msgs.get(0));
}
} else {
if (batchReceiveInstrumenter.shouldStart(parentContext, null)) {
Context rootContext = batchReceiveInstrumenter.start(parentContext, null);
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
return rootContext;
}
}
return parentContext;
}
private void createChildSpan(Context parentContext, MessageExt msg) {
if (batchProcessInstrumenter.shouldStart(parentContext, msg)) {
Context context = batchProcessInstrumenter.start(parentContext, msg);
batchProcessInstrumenter.end(context, msg, null, null);
}
}
void end(Context context, List<MessageExt> msgs) {
if (msgs.size() == 1) {
singleProcessInstrumenter.end(context, msgs.get(0), null, null);
} else {
batchReceiveInstrumenter.end(context, null, null, null);
}
}
}

View File

@ -1,108 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import static io.opentelemetry.api.trace.SpanKind.CONSUMER;
import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.message.MessageExt;
final class RocketMqConsumerTracer extends BaseTracer {
private final boolean captureExperimentalSpanAttributes;
private final boolean propagationEnabled;
RocketMqConsumerTracer(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
super(openTelemetry);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
this.propagationEnabled = propagationEnabled;
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.rocketmq-client-4.8";
}
Context startSpan(Context parentContext, List<MessageExt> msgs) {
if (msgs.size() == 1) {
SpanBuilder spanBuilder = startSpanBuilder(extractParent(msgs.get(0)), msgs.get(0));
return withConsumerSpan(parentContext, spanBuilder.startSpan());
} else {
SpanBuilder spanBuilder =
spanBuilder(parentContext, "multiple_sources receive", CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive");
Context rootContext = withConsumerSpan(parentContext, spanBuilder.startSpan());
for (MessageExt message : msgs) {
createChildSpan(rootContext, message);
}
return rootContext;
}
}
private void createChildSpan(Context parentContext, MessageExt msg) {
SpanBuilder childSpanBuilder =
startSpanBuilder(parentContext, msg)
.addLink(Span.fromContext(extractParent(msg)).getSpanContext());
end(parentContext.with(childSpanBuilder.startSpan()));
}
private SpanBuilder startSpanBuilder(Context parentContext, MessageExt msg) {
SpanBuilder spanBuilder =
spanBuilder(parentContext, spanNameOnConsume(msg), CONSUMER)
.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq")
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic())
.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic")
.setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process")
.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId())
.setAttribute(
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES,
(long) msg.getBody().length);
onConsume(spanBuilder, msg);
return spanBuilder;
}
private Context extractParent(MessageExt msg) {
if (propagationEnabled) {
return extract(msg.getProperties(), GETTER);
} else {
return Context.current();
}
}
private void onConsume(SpanBuilder spanBuilder, MessageExt msg) {
if (captureExperimentalSpanAttributes) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId());
spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg));
}
}
private static String spanNameOnConsume(MessageExt msg) {
return msg.getTopic() + " process";
}
@Nullable
private static String getBrokerHost(MessageExt msg) {
if (msg.getStoreHost() != null) {
return msg.getStoreHost().toString().replace("/", "");
} else {
return null;
}
}
}

View File

@ -0,0 +1,118 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import static io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor.constant;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION;
import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.common.message.MessageExt;
class RocketMqInstrumenterFactory {
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-4.8";
private static final RockerMqProducerAttributeExtractor producerAttributesExtractor =
new RockerMqProducerAttributeExtractor();
private static final RockerMqProducerExperimentalAttributeExtractor
experimentalProducerAttributesExtractor =
new RockerMqProducerExperimentalAttributeExtractor();
public static final RockerMqConsumerAttributeExtractor consumerAttributesExtractor =
new RockerMqConsumerAttributeExtractor();
public static final RockerMqConsumerExperimentalAttributeExtractor
experimentalConsumerAttributesExtractor =
new RockerMqConsumerExperimentalAttributeExtractor();
static Instrumenter<SendMessageContext, Void> createProducerInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
InstrumenterBuilder<SendMessageContext, Void> instrumenterBuilder =
Instrumenter.<SendMessageContext, Void>builder(
openTelemetry, INSTRUMENTATION_NAME, RocketMqInstrumenterFactory::spanNameOnProduce)
.addAttributesExtractor(producerAttributesExtractor);
if (captureExperimentalSpanAttributes) {
instrumenterBuilder.addAttributesExtractor(experimentalProducerAttributesExtractor);
}
if (propagationEnabled) {
return instrumenterBuilder.newProducerInstrumenter(MapSetter.INSTANCE);
} else {
return instrumenterBuilder.newInstrumenter(SpanKindExtractor.alwaysProducer());
}
}
static RocketMqConsumerInstrumenter createConsumerInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
InstrumenterBuilder<Void, Void> batchReceiveInstrumenterBuilder =
Instrumenter.<Void, Void>builder(
openTelemetry, INSTRUMENTATION_NAME, RocketMqInstrumenterFactory::spanNameOnReceive)
.addAttributesExtractor(constant(MESSAGING_SYSTEM, "rocketmq"))
.addAttributesExtractor(constant(MESSAGING_OPERATION, "receive"));
return new RocketMqConsumerInstrumenter(
createProcessInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, false),
createProcessInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, true),
batchReceiveInstrumenterBuilder.newInstrumenter(SpanKindExtractor.alwaysConsumer()));
}
private static Instrumenter<MessageExt, Void> createProcessInstrumenter(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled,
boolean batch) {
InstrumenterBuilder<MessageExt, Void> builder =
Instrumenter.builder(
openTelemetry, INSTRUMENTATION_NAME, RocketMqInstrumenterFactory::spanNameOnConsume);
builder.addAttributesExtractor(consumerAttributesExtractor);
if (captureExperimentalSpanAttributes) {
builder.addAttributesExtractor(experimentalConsumerAttributesExtractor);
}
if (!propagationEnabled) {
return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer());
}
if (batch) {
SpanLinksExtractor<MessageExt> spanLinksExtractor =
SpanLinksExtractor.fromUpstreamRequest(
openTelemetry.getPropagators(), TextMapExtractAdapter.GETTER);
return builder
.addSpanLinksExtractor(spanLinksExtractor)
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
} else {
return builder.newConsumerInstrumenter(TextMapExtractAdapter.GETTER);
}
}
private static String spanNameOnReceive(Void unused) {
return "multiple_sources receive";
}
private static String spanNameOnProduce(SendMessageContext request) {
return request.getMessage().getTopic() + " send";
}
private static String spanNameOnConsume(MessageExt msg) {
return msg.getTopic() + " process";
}
}

View File

@ -1,60 +0,0 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/
package io.opentelemetry.instrumentation.rocketmq;
import static io.opentelemetry.api.trace.SpanKind.PRODUCER;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanBuilder;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.tracer.BaseTracer;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
final class RocketMqProducerTracer extends BaseTracer {
private final boolean captureExperimentalSpanAttributes;
RocketMqProducerTracer(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) {
super(openTelemetry);
this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes;
}
@Override
protected String getInstrumentationName() {
return "io.opentelemetry.rocketmq-client-4.8";
}
Context startProducerSpan(Context parentContext, String addr, Message msg) {
SpanBuilder spanBuilder = spanBuilder(parentContext, spanNameOnProduce(msg), PRODUCER);
onProduce(spanBuilder, msg, addr);
return parentContext.with(spanBuilder.startSpan());
}
private void onProduce(SpanBuilder spanBuilder, Message msg, String addr) {
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq");
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic");
spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic());
if (captureExperimentalSpanAttributes) {
spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags());
spanBuilder.setAttribute("messaging.rocketmq.broker_address", addr);
}
}
public void afterProduce(Context context, SendResult sendResult) {
Span span = Span.fromContext(context);
span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, sendResult.getMsgId());
if (captureExperimentalSpanAttributes) {
span.setAttribute("messaging.rocketmq.send_result", sendResult.getSendStatus().name());
}
}
private static String spanNameOnProduce(Message msg) {
return msg.getTopic() + " send";
}
}

View File

@ -6,7 +6,9 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.hook.ConsumeMessageHook;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
/** Entrypoint for tracing RocketMq producers or consumers. */
@ -24,21 +26,19 @@ public final class RocketMqTracing {
return new RocketMqTracingBuilder(openTelemetry);
}
private final boolean propagationEnabled;
private final RocketMqConsumerTracer rocketMqConsumerTracer;
private final RocketMqProducerTracer rocketMqProducerTracer;
private final RocketMqConsumerInstrumenter rocketMqConsumerInstrumenter;
private final Instrumenter<SendMessageContext, Void> rocketMqProducerInstrumenter;
RocketMqTracing(
OpenTelemetry openTelemetry,
boolean captureExperimentalSpanAttributes,
boolean propagationEnabled) {
this.propagationEnabled = propagationEnabled;
rocketMqConsumerTracer =
new RocketMqConsumerTracer(
rocketMqConsumerInstrumenter =
RocketMqInstrumenterFactory.createConsumerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled);
rocketMqProducerInstrumenter =
RocketMqInstrumenterFactory.createProducerInstrumenter(
openTelemetry, captureExperimentalSpanAttributes, propagationEnabled);
rocketMqProducerTracer =
new RocketMqProducerTracer(openTelemetry, captureExperimentalSpanAttributes);
}
/**
@ -46,7 +46,7 @@ public final class RocketMqTracing {
* org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#registerConsumeMessageHook(ConsumeMessageHook)}.
*/
public ConsumeMessageHook newTracingConsumeMessageHook() {
return new TracingConsumeMessageHookImpl(rocketMqConsumerTracer);
return new TracingConsumeMessageHookImpl(rocketMqConsumerInstrumenter);
}
/**
@ -54,6 +54,6 @@ public final class RocketMqTracing {
* org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#registerSendMessageHook(SendMessageHook)}.
*/
public SendMessageHook newTracingSendMessageHook() {
return new TracingSendMessageHookImpl(rocketMqProducerTracer, propagationEnabled);
return new TracingSendMessageHookImpl(rocketMqProducerInstrumenter);
}
}

View File

@ -6,19 +6,21 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.propagation.TextMapGetter;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.rocketmq.common.message.MessageExt;
final class TextMapExtractAdapter implements TextMapGetter<Map<String, String>> {
final class TextMapExtractAdapter implements TextMapGetter<MessageExt> {
public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter();
@Override
public Iterable<String> keys(Map<String, String> carrier) {
return carrier.keySet();
public Iterable<String> keys(MessageExt carrier) {
return carrier.getProperties().keySet();
}
@Nullable
@Override
public String get(Map<String, String> carrier, String key) {
return carrier.get(key);
public String get(@Nullable MessageExt carrier, String key) {
return carrier == null ? null : carrier.getProperties().get(key);
}
}

View File

@ -11,10 +11,10 @@ import org.apache.rocketmq.client.hook.ConsumeMessageHook;
final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
private final RocketMqConsumerTracer tracer;
private final RocketMqConsumerInstrumenter instrumenter;
TracingConsumeMessageHookImpl(RocketMqConsumerTracer tracer) {
this.tracer = tracer;
TracingConsumeMessageHookImpl(RocketMqConsumerInstrumenter instrumenter) {
this.instrumenter = instrumenter;
}
@Override
@ -27,13 +27,16 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) {
return;
}
Context otelContext = tracer.startSpan(Context.current(), context.getMsgList());
Context parentContext = Context.current();
Context newContext = instrumenter.start(parentContext, context.getMsgList());
// it's safe to store the scope in the rocketMq trace context, both before() and after() methods
// are always called from the same thread; see:
// - ConsumeMessageConcurrentlyService$ConsumeRequest#run()
// - ConsumeMessageOrderlyService$ConsumeRequest#run()
context.setMqTraceContext(ContextAndScope.create(otelContext, otelContext.makeCurrent()));
if (newContext != parentContext) {
context.setMqTraceContext(ContextAndScope.create(newContext, newContext.makeCurrent()));
}
}
@Override
@ -44,7 +47,7 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook {
if (context.getMqTraceContext() instanceof ContextAndScope) {
ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext();
contextAndScope.close();
tracer.end(contextAndScope.getContext());
instrumenter.end(contextAndScope.getContext(), context.getMsgList());
}
}
}

View File

@ -6,17 +6,16 @@
package io.opentelemetry.instrumentation.rocketmq;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
import org.apache.rocketmq.client.hook.SendMessageContext;
import org.apache.rocketmq.client.hook.SendMessageHook;
final class TracingSendMessageHookImpl implements SendMessageHook {
private final RocketMqProducerTracer tracer;
private final boolean propagationEnabled;
private final Instrumenter<SendMessageContext, Void> instrumenter;
TracingSendMessageHookImpl(RocketMqProducerTracer tracer, boolean propagationEnabled) {
this.tracer = tracer;
this.propagationEnabled = propagationEnabled;
TracingSendMessageHookImpl(Instrumenter<SendMessageContext, Void> instrumenter) {
this.instrumenter = instrumenter;
}
@Override
@ -29,23 +28,21 @@ final class TracingSendMessageHookImpl implements SendMessageHook {
if (context == null) {
return;
}
Context otelContext =
tracer.startProducerSpan(Context.current(), context.getBrokerAddr(), context.getMessage());
if (propagationEnabled) {
tracer.inject(otelContext, context.getMessage().getProperties(), MapSetter.INSTANCE);
Context parentContext = Context.current();
if (!instrumenter.shouldStart(parentContext, context)) {
return;
}
context.setMqTraceContext(otelContext);
context.setMqTraceContext(instrumenter.start(parentContext, context));
}
@Override
public void sendMessageAfter(SendMessageContext context) {
if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) {
if (context == null) {
return;
}
if (context.getMqTraceContext() instanceof Context) {
Context otelContext = (Context) context.getMqTraceContext();
tracer.afterProduce(otelContext, context.getSendResult());
tracer.end(otelContext);
instrumenter.end(otelContext, context, null, null);
}
}
}

View File

@ -21,6 +21,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER
import static io.opentelemetry.api.trace.SpanKind.INTERNAL
import static io.opentelemetry.api.trace.SpanKind.PRODUCER
//TODO add tests for propagationEnabled flag
@Unroll
abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {