diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/MapSetter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/MapSetter.java index e254a9bca1..5b481d4591 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/MapSetter.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/MapSetter.java @@ -6,13 +6,16 @@ package io.opentelemetry.instrumentation.rocketmq; import io.opentelemetry.context.propagation.TextMapSetter; -import java.util.Map; +import org.apache.rocketmq.client.hook.SendMessageContext; -enum MapSetter implements TextMapSetter> { +enum MapSetter implements TextMapSetter { INSTANCE; @Override - public void set(Map carrier, String key, String value) { - carrier.put(key, value); + public void set(SendMessageContext carrier, String key, String value) { + if (carrier == null) { + return; + } + carrier.getMessage().getProperties().put(key, value); } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqConsumerAttributeExtractor.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqConsumerAttributeExtractor.java new file mode 100644 index 0000000000..717ce86cee --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqConsumerAttributeExtractor.java @@ -0,0 +1,80 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; +import org.apache.rocketmq.common.message.MessageExt; + +class RockerMqConsumerAttributeExtractor extends MessagingAttributesExtractor { + @Override + public MessageOperation operation() { + return MessageOperation.PROCESS; + } + + @Override + protected String system(MessageExt request) { + return "rocketmq"; + } + + @Override + protected String destinationKind(MessageExt request) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Override + protected String destination(MessageExt request) { + return request.getTopic(); + } + + @Override + protected boolean temporaryDestination(MessageExt request) { + return false; + } + + @Nullable + @Override + protected String protocol(MessageExt request) { + return null; + } + + @Nullable + @Override + protected String protocolVersion(MessageExt request) { + return null; + } + + @Nullable + @Override + protected String url(MessageExt request) { + return null; + } + + @Nullable + @Override + protected String conversationId(MessageExt request) { + return null; + } + + @Override + protected Long messagePayloadSize(MessageExt request) { + return (long) request.getBody().length; + } + + @Nullable + @Override + protected Long messagePayloadCompressedSize(MessageExt request) { + return null; + } + + @Nullable + @Override + protected String messageId(MessageExt request, @Nullable Void unused) { + return request.getMsgId(); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqConsumerExperimentalAttributeExtractor.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqConsumerExperimentalAttributeExtractor.java new file mode 100644 index 0000000000..0c2110004d --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqConsumerExperimentalAttributeExtractor.java @@ -0,0 +1,48 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import javax.annotation.Nullable; +import org.apache.rocketmq.common.message.MessageExt; + +class RockerMqConsumerExperimentalAttributeExtractor + implements AttributesExtractor { + private static final AttributeKey MESSAGING_ROCKETMQ_TAGS = + AttributeKey.stringKey("messaging.rocketmq.tags"); + private static final AttributeKey MESSAGING_ROCKETMQ_QUEUE_ID = + AttributeKey.longKey("messaging.rocketmq.queue_id"); + private static final AttributeKey MESSAGING_ROCKETMQ_QUEUE_OFFSET = + AttributeKey.longKey("messaging.rocketmq.queue_offset"); + private static final AttributeKey MESSAGING_ROCKETMQ_BROKER_ADDRESS = + AttributeKey.stringKey("messaging.rocketmq.broker_address"); + + @Override + public void onStart(AttributesBuilder attributes, MessageExt msg) { + set(attributes, MESSAGING_ROCKETMQ_TAGS, msg.getTags()); + set(attributes, MESSAGING_ROCKETMQ_QUEUE_ID, (long) msg.getQueueId()); + set(attributes, MESSAGING_ROCKETMQ_QUEUE_OFFSET, msg.getQueueOffset()); + set(attributes, MESSAGING_ROCKETMQ_BROKER_ADDRESS, getBrokerHost(msg)); + } + + @Nullable + private static String getBrokerHost(MessageExt msg) { + if (msg.getStoreHost() != null) { + return msg.getStoreHost().toString().replace("/", ""); + } else { + return null; + } + } + + @Override + public void onEnd( + AttributesBuilder attributes, + MessageExt consumeMessageContext, + @Nullable Void unused, + @Nullable Throwable error) {} +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqProducerAttributeExtractor.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqProducerAttributeExtractor.java new file mode 100644 index 0000000000..06ddc4fd4e --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqProducerAttributeExtractor.java @@ -0,0 +1,82 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation; +import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor; +import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.hook.SendMessageContext; + +class RockerMqProducerAttributeExtractor + extends MessagingAttributesExtractor { + @Override + public MessageOperation operation() { + return MessageOperation.SEND; + } + + @Override + protected String system(SendMessageContext sendMessageContext) { + return "rocketmq"; + } + + @Override + protected String destinationKind(SendMessageContext sendMessageContext) { + return SemanticAttributes.MessagingDestinationKindValues.TOPIC; + } + + @Override + protected String destination(SendMessageContext sendMessageContext) { + return sendMessageContext.getMessage().getTopic(); + } + + @Override + protected boolean temporaryDestination(SendMessageContext sendMessageContext) { + return false; + } + + @Nullable + @Override + protected String protocol(SendMessageContext sendMessageContext) { + return null; + } + + @Nullable + @Override + protected String protocolVersion(SendMessageContext sendMessageContext) { + return null; + } + + @Nullable + @Override + protected String url(SendMessageContext sendMessageContext) { + return null; + } + + @Nullable + @Override + protected String conversationId(SendMessageContext sendMessageContext) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadSize(SendMessageContext sendMessageContext) { + return null; + } + + @Nullable + @Override + protected Long messagePayloadCompressedSize(SendMessageContext sendMessageContext) { + return null; + } + + @Nullable + @Override + protected String messageId(SendMessageContext request, @Nullable Void unused) { + return request.getSendResult().getMsgId(); + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqProducerExperimentalAttributeExtractor.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqProducerExperimentalAttributeExtractor.java new file mode 100644 index 0000000000..50c9c7e2cd --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RockerMqProducerExperimentalAttributeExtractor.java @@ -0,0 +1,42 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.api.common.AttributesBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor; +import javax.annotation.Nullable; +import org.apache.rocketmq.client.hook.SendMessageContext; + +class RockerMqProducerExperimentalAttributeExtractor + implements AttributesExtractor { + private static final AttributeKey MESSAGING_ROCKETMQ_TAGS = + AttributeKey.stringKey("messaging.rocketmq.tags"); + private static final AttributeKey MESSAGING_ROCKETMQ_BROKER_ADDRESS = + AttributeKey.stringKey("messaging.rocketmq.broker_address"); + private static final AttributeKey MESSAGING_ROCKETMQ_SEND_RESULT = + AttributeKey.stringKey("messaging.rocketmq.send_result"); + + @Override + public void onStart(AttributesBuilder attributes, SendMessageContext request) { + set(attributes, MESSAGING_ROCKETMQ_TAGS, request.getMessage().getTags()); + set(attributes, MESSAGING_ROCKETMQ_BROKER_ADDRESS, request.getBrokerAddr()); + } + + @Override + public void onEnd( + AttributesBuilder attributes, + SendMessageContext request, + @Nullable Void unused, + @Nullable Throwable error) { + if (request.getSendResult() != null) { + set( + attributes, + MESSAGING_ROCKETMQ_SEND_RESULT, + request.getSendResult().getSendStatus().name()); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerInstrumenter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerInstrumenter.java new file mode 100644 index 0000000000..6d5bb13d1f --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerInstrumenter.java @@ -0,0 +1,59 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import java.util.List; +import org.apache.rocketmq.common.message.MessageExt; + +final class RocketMqConsumerInstrumenter { + + private final Instrumenter singleProcessInstrumenter; + private final Instrumenter batchProcessInstrumenter; + private final Instrumenter batchReceiveInstrumenter; + + RocketMqConsumerInstrumenter( + Instrumenter singleProcessInstrumenter, + Instrumenter batchProcessInstrumenter, + Instrumenter batchReceiveInstrumenter) { + this.singleProcessInstrumenter = singleProcessInstrumenter; + this.batchProcessInstrumenter = batchProcessInstrumenter; + this.batchReceiveInstrumenter = batchReceiveInstrumenter; + } + + Context start(Context parentContext, List msgs) { + if (msgs.size() == 1) { + if (singleProcessInstrumenter.shouldStart(parentContext, msgs.get(0))) { + return singleProcessInstrumenter.start(parentContext, msgs.get(0)); + } + } else { + if (batchReceiveInstrumenter.shouldStart(parentContext, null)) { + Context rootContext = batchReceiveInstrumenter.start(parentContext, null); + for (MessageExt message : msgs) { + createChildSpan(rootContext, message); + } + return rootContext; + } + } + return parentContext; + } + + private void createChildSpan(Context parentContext, MessageExt msg) { + if (batchProcessInstrumenter.shouldStart(parentContext, msg)) { + Context context = batchProcessInstrumenter.start(parentContext, msg); + batchProcessInstrumenter.end(context, msg, null, null); + } + } + + void end(Context context, List msgs) { + if (msgs.size() == 1) { + singleProcessInstrumenter.end(context, msgs.get(0), null, null); + } else { + batchReceiveInstrumenter.end(context, null, null, null); + } + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java deleted file mode 100644 index 803c2a0e26..0000000000 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqConsumerTracer.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmq; - -import static io.opentelemetry.api.trace.SpanKind.CONSUMER; -import static io.opentelemetry.instrumentation.rocketmq.TextMapExtractAdapter.GETTER; - -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.tracer.BaseTracer; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.rocketmq.common.message.MessageExt; - -final class RocketMqConsumerTracer extends BaseTracer { - - private final boolean captureExperimentalSpanAttributes; - private final boolean propagationEnabled; - - RocketMqConsumerTracer( - OpenTelemetry openTelemetry, - boolean captureExperimentalSpanAttributes, - boolean propagationEnabled) { - super(openTelemetry); - this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; - this.propagationEnabled = propagationEnabled; - } - - @Override - protected String getInstrumentationName() { - return "io.opentelemetry.rocketmq-client-4.8"; - } - - Context startSpan(Context parentContext, List msgs) { - if (msgs.size() == 1) { - SpanBuilder spanBuilder = startSpanBuilder(extractParent(msgs.get(0)), msgs.get(0)); - return withConsumerSpan(parentContext, spanBuilder.startSpan()); - } else { - SpanBuilder spanBuilder = - spanBuilder(parentContext, "multiple_sources receive", CONSUMER) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") - .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "receive"); - Context rootContext = withConsumerSpan(parentContext, spanBuilder.startSpan()); - for (MessageExt message : msgs) { - createChildSpan(rootContext, message); - } - return rootContext; - } - } - - private void createChildSpan(Context parentContext, MessageExt msg) { - SpanBuilder childSpanBuilder = - startSpanBuilder(parentContext, msg) - .addLink(Span.fromContext(extractParent(msg)).getSpanContext()); - end(parentContext.with(childSpanBuilder.startSpan())); - } - - private SpanBuilder startSpanBuilder(Context parentContext, MessageExt msg) { - SpanBuilder spanBuilder = - spanBuilder(parentContext, spanNameOnConsume(msg), CONSUMER) - .setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq") - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()) - .setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic") - .setAttribute(SemanticAttributes.MESSAGING_OPERATION, "process") - .setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, msg.getMsgId()) - .setAttribute( - SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, - (long) msg.getBody().length); - onConsume(spanBuilder, msg); - return spanBuilder; - } - - private Context extractParent(MessageExt msg) { - if (propagationEnabled) { - return extract(msg.getProperties(), GETTER); - } else { - return Context.current(); - } - } - - private void onConsume(SpanBuilder spanBuilder, MessageExt msg) { - if (captureExperimentalSpanAttributes) { - spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags()); - spanBuilder.setAttribute("messaging.rocketmq.queue_id", msg.getQueueId()); - spanBuilder.setAttribute("messaging.rocketmq.queue_offset", msg.getQueueOffset()); - spanBuilder.setAttribute("messaging.rocketmq.broker_address", getBrokerHost(msg)); - } - } - - private static String spanNameOnConsume(MessageExt msg) { - return msg.getTopic() + " process"; - } - - @Nullable - private static String getBrokerHost(MessageExt msg) { - if (msg.getStoreHost() != null) { - return msg.getStoreHost().toString().replace("/", ""); - } else { - return null; - } - } -} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java new file mode 100644 index 0000000000..3960b30763 --- /dev/null +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqInstrumenterFactory.java @@ -0,0 +1,118 @@ +/* + * Copyright The OpenTelemetry Authors + * SPDX-License-Identifier: Apache-2.0 + */ + +package io.opentelemetry.instrumentation.rocketmq; + +import static io.opentelemetry.instrumentation.api.instrumenter.AttributesExtractor.constant; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_OPERATION; +import static io.opentelemetry.semconv.trace.attributes.SemanticAttributes.MESSAGING_SYSTEM; + +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; +import io.opentelemetry.instrumentation.api.instrumenter.InstrumenterBuilder; +import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor; +import io.opentelemetry.instrumentation.api.instrumenter.SpanLinksExtractor; +import org.apache.rocketmq.client.hook.SendMessageContext; +import org.apache.rocketmq.common.message.MessageExt; + +class RocketMqInstrumenterFactory { + + private static final String INSTRUMENTATION_NAME = "io.opentelemetry.rocketmq-client-4.8"; + + private static final RockerMqProducerAttributeExtractor producerAttributesExtractor = + new RockerMqProducerAttributeExtractor(); + private static final RockerMqProducerExperimentalAttributeExtractor + experimentalProducerAttributesExtractor = + new RockerMqProducerExperimentalAttributeExtractor(); + + public static final RockerMqConsumerAttributeExtractor consumerAttributesExtractor = + new RockerMqConsumerAttributeExtractor(); + public static final RockerMqConsumerExperimentalAttributeExtractor + experimentalConsumerAttributesExtractor = + new RockerMqConsumerExperimentalAttributeExtractor(); + + static Instrumenter createProducerInstrumenter( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean propagationEnabled) { + + InstrumenterBuilder instrumenterBuilder = + Instrumenter.builder( + openTelemetry, INSTRUMENTATION_NAME, RocketMqInstrumenterFactory::spanNameOnProduce) + .addAttributesExtractor(producerAttributesExtractor); + if (captureExperimentalSpanAttributes) { + instrumenterBuilder.addAttributesExtractor(experimentalProducerAttributesExtractor); + } + + if (propagationEnabled) { + return instrumenterBuilder.newProducerInstrumenter(MapSetter.INSTANCE); + } else { + return instrumenterBuilder.newInstrumenter(SpanKindExtractor.alwaysProducer()); + } + } + + static RocketMqConsumerInstrumenter createConsumerInstrumenter( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean propagationEnabled) { + + InstrumenterBuilder batchReceiveInstrumenterBuilder = + Instrumenter.builder( + openTelemetry, INSTRUMENTATION_NAME, RocketMqInstrumenterFactory::spanNameOnReceive) + .addAttributesExtractor(constant(MESSAGING_SYSTEM, "rocketmq")) + .addAttributesExtractor(constant(MESSAGING_OPERATION, "receive")); + + return new RocketMqConsumerInstrumenter( + createProcessInstrumenter( + openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, false), + createProcessInstrumenter( + openTelemetry, captureExperimentalSpanAttributes, propagationEnabled, true), + batchReceiveInstrumenterBuilder.newInstrumenter(SpanKindExtractor.alwaysConsumer())); + } + + private static Instrumenter createProcessInstrumenter( + OpenTelemetry openTelemetry, + boolean captureExperimentalSpanAttributes, + boolean propagationEnabled, + boolean batch) { + + InstrumenterBuilder builder = + Instrumenter.builder( + openTelemetry, INSTRUMENTATION_NAME, RocketMqInstrumenterFactory::spanNameOnConsume); + + builder.addAttributesExtractor(consumerAttributesExtractor); + if (captureExperimentalSpanAttributes) { + builder.addAttributesExtractor(experimentalConsumerAttributesExtractor); + } + + if (!propagationEnabled) { + return builder.newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } + + if (batch) { + SpanLinksExtractor spanLinksExtractor = + SpanLinksExtractor.fromUpstreamRequest( + openTelemetry.getPropagators(), TextMapExtractAdapter.GETTER); + + return builder + .addSpanLinksExtractor(spanLinksExtractor) + .newInstrumenter(SpanKindExtractor.alwaysConsumer()); + } else { + return builder.newConsumerInstrumenter(TextMapExtractAdapter.GETTER); + } + } + + private static String spanNameOnReceive(Void unused) { + return "multiple_sources receive"; + } + + private static String spanNameOnProduce(SendMessageContext request) { + return request.getMessage().getTopic() + " send"; + } + + private static String spanNameOnConsume(MessageExt msg) { + return msg.getTopic() + " process"; + } +} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java deleted file mode 100644 index 11a4825152..0000000000 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqProducerTracer.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright The OpenTelemetry Authors - * SPDX-License-Identifier: Apache-2.0 - */ - -package io.opentelemetry.instrumentation.rocketmq; - -import static io.opentelemetry.api.trace.SpanKind.PRODUCER; - -import io.opentelemetry.api.OpenTelemetry; -import io.opentelemetry.api.trace.Span; -import io.opentelemetry.api.trace.SpanBuilder; -import io.opentelemetry.context.Context; -import io.opentelemetry.instrumentation.api.tracer.BaseTracer; -import io.opentelemetry.semconv.trace.attributes.SemanticAttributes; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; - -final class RocketMqProducerTracer extends BaseTracer { - - private final boolean captureExperimentalSpanAttributes; - - RocketMqProducerTracer(OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes) { - super(openTelemetry); - this.captureExperimentalSpanAttributes = captureExperimentalSpanAttributes; - } - - @Override - protected String getInstrumentationName() { - return "io.opentelemetry.rocketmq-client-4.8"; - } - - Context startProducerSpan(Context parentContext, String addr, Message msg) { - SpanBuilder spanBuilder = spanBuilder(parentContext, spanNameOnProduce(msg), PRODUCER); - onProduce(spanBuilder, msg, addr); - return parentContext.with(spanBuilder.startSpan()); - } - - private void onProduce(SpanBuilder spanBuilder, Message msg, String addr) { - spanBuilder.setAttribute(SemanticAttributes.MESSAGING_SYSTEM, "rocketmq"); - spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION_KIND, "topic"); - spanBuilder.setAttribute(SemanticAttributes.MESSAGING_DESTINATION, msg.getTopic()); - if (captureExperimentalSpanAttributes) { - spanBuilder.setAttribute("messaging.rocketmq.tags", msg.getTags()); - spanBuilder.setAttribute("messaging.rocketmq.broker_address", addr); - } - } - - public void afterProduce(Context context, SendResult sendResult) { - Span span = Span.fromContext(context); - span.setAttribute(SemanticAttributes.MESSAGING_MESSAGE_ID, sendResult.getMsgId()); - if (captureExperimentalSpanAttributes) { - span.setAttribute("messaging.rocketmq.send_result", sendResult.getSendStatus().name()); - } - } - - private static String spanNameOnProduce(Message msg) { - return msg.getTopic() + " send"; - } -} diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java index cb495d0e3a..9d7737e45b 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/RocketMqTracing.java @@ -6,7 +6,9 @@ package io.opentelemetry.instrumentation.rocketmq; import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import org.apache.rocketmq.client.hook.ConsumeMessageHook; +import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; /** Entrypoint for tracing RocketMq producers or consumers. */ @@ -24,21 +26,19 @@ public final class RocketMqTracing { return new RocketMqTracingBuilder(openTelemetry); } - private final boolean propagationEnabled; - - private final RocketMqConsumerTracer rocketMqConsumerTracer; - private final RocketMqProducerTracer rocketMqProducerTracer; + private final RocketMqConsumerInstrumenter rocketMqConsumerInstrumenter; + private final Instrumenter rocketMqProducerInstrumenter; RocketMqTracing( OpenTelemetry openTelemetry, boolean captureExperimentalSpanAttributes, boolean propagationEnabled) { - this.propagationEnabled = propagationEnabled; - rocketMqConsumerTracer = - new RocketMqConsumerTracer( + rocketMqConsumerInstrumenter = + RocketMqInstrumenterFactory.createConsumerInstrumenter( + openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); + rocketMqProducerInstrumenter = + RocketMqInstrumenterFactory.createProducerInstrumenter( openTelemetry, captureExperimentalSpanAttributes, propagationEnabled); - rocketMqProducerTracer = - new RocketMqProducerTracer(openTelemetry, captureExperimentalSpanAttributes); } /** @@ -46,7 +46,7 @@ public final class RocketMqTracing { * org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#registerConsumeMessageHook(ConsumeMessageHook)}. */ public ConsumeMessageHook newTracingConsumeMessageHook() { - return new TracingConsumeMessageHookImpl(rocketMqConsumerTracer); + return new TracingConsumeMessageHookImpl(rocketMqConsumerInstrumenter); } /** @@ -54,6 +54,6 @@ public final class RocketMqTracing { * org.apache.rocketmq.client.impl.producer.DefaultMQProducerImpl#registerSendMessageHook(SendMessageHook)}. */ public SendMessageHook newTracingSendMessageHook() { - return new TracingSendMessageHookImpl(rocketMqProducerTracer, propagationEnabled); + return new TracingSendMessageHookImpl(rocketMqProducerInstrumenter); } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java index c7fd9ed0a6..097ec53227 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TextMapExtractAdapter.java @@ -6,19 +6,21 @@ package io.opentelemetry.instrumentation.rocketmq; import io.opentelemetry.context.propagation.TextMapGetter; -import java.util.Map; +import javax.annotation.Nullable; +import org.apache.rocketmq.common.message.MessageExt; -final class TextMapExtractAdapter implements TextMapGetter> { +final class TextMapExtractAdapter implements TextMapGetter { public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(); @Override - public Iterable keys(Map carrier) { - return carrier.keySet(); + public Iterable keys(MessageExt carrier) { + return carrier.getProperties().keySet(); } + @Nullable @Override - public String get(Map carrier, String key) { - return carrier.get(key); + public String get(@Nullable MessageExt carrier, String key) { + return carrier == null ? null : carrier.getProperties().get(key); } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java index fd3d7d54b7..dc05a22980 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingConsumeMessageHookImpl.java @@ -11,10 +11,10 @@ import org.apache.rocketmq.client.hook.ConsumeMessageHook; final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { - private final RocketMqConsumerTracer tracer; + private final RocketMqConsumerInstrumenter instrumenter; - TracingConsumeMessageHookImpl(RocketMqConsumerTracer tracer) { - this.tracer = tracer; + TracingConsumeMessageHookImpl(RocketMqConsumerInstrumenter instrumenter) { + this.instrumenter = instrumenter; } @Override @@ -27,13 +27,16 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { if (context == null || context.getMsgList() == null || context.getMsgList().isEmpty()) { return; } - Context otelContext = tracer.startSpan(Context.current(), context.getMsgList()); + Context parentContext = Context.current(); + Context newContext = instrumenter.start(parentContext, context.getMsgList()); // it's safe to store the scope in the rocketMq trace context, both before() and after() methods // are always called from the same thread; see: // - ConsumeMessageConcurrentlyService$ConsumeRequest#run() // - ConsumeMessageOrderlyService$ConsumeRequest#run() - context.setMqTraceContext(ContextAndScope.create(otelContext, otelContext.makeCurrent())); + if (newContext != parentContext) { + context.setMqTraceContext(ContextAndScope.create(newContext, newContext.makeCurrent())); + } } @Override @@ -44,7 +47,7 @@ final class TracingConsumeMessageHookImpl implements ConsumeMessageHook { if (context.getMqTraceContext() instanceof ContextAndScope) { ContextAndScope contextAndScope = (ContextAndScope) context.getMqTraceContext(); contextAndScope.close(); - tracer.end(contextAndScope.getContext()); + instrumenter.end(contextAndScope.getContext(), context.getMsgList()); } } } diff --git a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java index 4c326a6a24..ad34c45bf0 100644 --- a/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java +++ b/instrumentation/rocketmq-client-4.8/library/src/main/java/io/opentelemetry/instrumentation/rocketmq/TracingSendMessageHookImpl.java @@ -6,17 +6,16 @@ package io.opentelemetry.instrumentation.rocketmq; import io.opentelemetry.context.Context; +import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter; import org.apache.rocketmq.client.hook.SendMessageContext; import org.apache.rocketmq.client.hook.SendMessageHook; final class TracingSendMessageHookImpl implements SendMessageHook { - private final RocketMqProducerTracer tracer; - private final boolean propagationEnabled; + private final Instrumenter instrumenter; - TracingSendMessageHookImpl(RocketMqProducerTracer tracer, boolean propagationEnabled) { - this.tracer = tracer; - this.propagationEnabled = propagationEnabled; + TracingSendMessageHookImpl(Instrumenter instrumenter) { + this.instrumenter = instrumenter; } @Override @@ -29,23 +28,21 @@ final class TracingSendMessageHookImpl implements SendMessageHook { if (context == null) { return; } - Context otelContext = - tracer.startProducerSpan(Context.current(), context.getBrokerAddr(), context.getMessage()); - if (propagationEnabled) { - tracer.inject(otelContext, context.getMessage().getProperties(), MapSetter.INSTANCE); + Context parentContext = Context.current(); + if (!instrumenter.shouldStart(parentContext, context)) { + return; } - context.setMqTraceContext(otelContext); + context.setMqTraceContext(instrumenter.start(parentContext, context)); } @Override public void sendMessageAfter(SendMessageContext context) { - if (context == null || context.getMqTraceContext() == null || context.getSendResult() == null) { + if (context == null) { return; } if (context.getMqTraceContext() instanceof Context) { Context otelContext = (Context) context.getMqTraceContext(); - tracer.afterProduce(otelContext, context.getSendResult()); - tracer.end(otelContext); + instrumenter.end(otelContext, context, null, null); } } } diff --git a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy index 6b417684ea..dccb10e343 100644 --- a/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy +++ b/instrumentation/rocketmq-client-4.8/testing/src/main/groovy/io/opentelemetry/instrumentation/rocketmq/AbstractRocketMqClientTest.groovy @@ -21,6 +21,7 @@ import static io.opentelemetry.api.trace.SpanKind.CONSUMER import static io.opentelemetry.api.trace.SpanKind.INTERNAL import static io.opentelemetry.api.trace.SpanKind.PRODUCER +//TODO add tests for propagationEnabled flag @Unroll abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {