Always set messaging operation (#9791)

This commit is contained in:
Lauri Tulmin 2023-11-06 12:30:27 +02:00 committed by GitHub
parent 0cbf80b1a4
commit 70e3962a87
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
31 changed files with 78 additions and 18 deletions

View File

@ -89,7 +89,7 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
attributes,
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_COMPRESSED_SIZE_BYTES,
getter.getMessagePayloadCompressedSize(request));
if (operation == RECEIVE || operation == PROCESS) {
if (operation != null) {
internalSet(attributes, SemanticAttributes.MESSAGING_OPERATION, operation.operationName());
}
}
@ -120,6 +120,10 @@ public final class MessagingAttributesExtractor<REQUEST, RESPONSE>
*/
@Override
public SpanKey internalGetSpanKey() {
if (operation == null) {
return null;
}
switch (operation) {
case PUBLISH:
return SpanKey.PRODUCER;

View File

@ -92,7 +92,7 @@ class MessagingAttributesExtractorTest {
void shouldExtractNoAttributesIfNoneAreAvailable() {
// given
AttributesExtractor<Map<String, String>, String> underTest =
MessagingAttributesExtractor.create(TestGetter.INSTANCE, MessageOperation.PUBLISH);
MessagingAttributesExtractor.create(TestGetter.INSTANCE, null);
Context context = Context.root();

View File

@ -104,6 +104,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long
@ -193,6 +194,7 @@ abstract class AbstractSqsTracingTest extends InstrumentationSpecification {
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.NET_PROTOCOL_NAME" "http"
"$SemanticAttributes.NET_PROTOCOL_VERSION" "1.1"
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" Long

View File

@ -115,6 +115,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
} else if (service == "Sqs" && operation == "SendMessage") {
"aws.queue.url" QUEUE_URL
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somequeue"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
} else if (service == "Kinesis") {
@ -217,6 +218,7 @@ abstract class AbstractAws2ClientTest extends AbstractAws2ClientCoreTest {
} else if (service == "Sqs" && operation == "SendMessage") {
"aws.queue.url" QUEUE_URL
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "somequeue"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
} else if (service == "Kinesis") {

View File

@ -154,6 +154,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
@ -321,6 +322,7 @@ abstract class AbstractAws2SqsTracingTest extends InstrumentationSpecification {
"net.peer.port" sqsPort
"$SemanticAttributes.MESSAGING_SYSTEM" "AmazonSQS"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testSdkSqs"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.HTTP_REQUEST_CONTENT_LENGTH" { it == null || it instanceof Long }
"$SemanticAttributes.HTTP_RESPONSE_CONTENT_LENGTH" { it == null || it instanceof Long }
}

View File

@ -102,6 +102,8 @@ class AwsSpanAssertions {
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "AmazonSQS")));
if (spanName.endsWith("receive")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "receive"));
} else if (spanName.endsWith("publish")) {
attributeAssertions.add(equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"));
}
}

View File

@ -169,6 +169,7 @@ public class Jms2InstrumentationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)));
@ -231,6 +232,7 @@ public class Jms2InstrumentationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span ->

View File

@ -129,6 +129,7 @@ public class Jms1InstrumentationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)));
@ -191,6 +192,7 @@ public class Jms1InstrumentationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span ->
@ -269,6 +271,7 @@ public class Jms1InstrumentationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary),
equalTo(
@ -338,6 +341,7 @@ public class Jms1InstrumentationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "jms"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, destinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary))),
trace ->

View File

@ -147,6 +147,7 @@ class Jms3InstrumentationTest {
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
producerDestinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)));
@ -214,6 +215,7 @@ class Jms3InstrumentationTest {
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
producerDestinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary)),
span ->
@ -297,6 +299,7 @@ class Jms3InstrumentationTest {
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
producerDestinationName),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
equalTo(SemanticAttributes.MESSAGING_MESSAGE_ID, messageId),
messagingTempDestination(isTemporary),
equalTo(

View File

@ -159,6 +159,7 @@ public abstract class KafkaClientBaseTest {
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),

View File

@ -28,6 +28,7 @@ class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))),

View File

@ -35,6 +35,7 @@ class InterceptorsTest extends AbstractInterceptorsTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer"))));

View File

@ -58,6 +58,7 @@ class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),

View File

@ -80,6 +80,7 @@ class WrapperTest extends AbstractWrapperTest {
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, SHARED_TOPIC),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),

View File

@ -99,6 +99,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_CLIENT_ID" { it.startsWith("producer") }
"$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
@ -154,6 +155,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_CLIENT_ID" { it.endsWith("producer") }
"$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0

View File

@ -94,6 +94,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_CLIENT_ID" "producer-1"
"$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
@ -132,6 +133,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "kafka"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_CLIENT_ID" String
"$SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$SemanticAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0

View File

@ -587,6 +587,7 @@ class PulsarClientTest extends AgentInstrumentationSpecification {
"$SemanticAttributes.NET_PEER_NAME" brokerHost
"$SemanticAttributes.NET_PEER_PORT" brokerPort
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destination
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
if (msgId == String) {
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
} else if (msgId != null) {

View File

@ -108,11 +108,11 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation {
Context parentContext = Java8BytecodeBridge.currentContext();
request = ChannelAndMethod.create(channel, method);
if (!channelInstrumenter().shouldStart(parentContext, request)) {
if (!channelInstrumenter(request).shouldStart(parentContext, request)) {
return;
}
context = channelInstrumenter().start(parentContext, request);
context = channelInstrumenter(request).start(parentContext, request);
CURRENT_RABBIT_CONTEXT.set(context);
helper().setChannelAndMethod(context, request);
scope = context.makeCurrent();
@ -132,7 +132,7 @@ public class RabbitChannelInstrumentation implements TypeInstrumentation {
scope.close();
CURRENT_RABBIT_CONTEXT.remove();
channelInstrumenter().end(context, request, null, throwable);
channelInstrumenter(request).end(context, request, null, throwable);
}
}

View File

@ -30,7 +30,9 @@ public final class RabbitSingletons {
.getBoolean("otel.instrumentation.rabbitmq.experimental-span-attributes", false);
private static final String instrumentationName = "io.opentelemetry.rabbitmq-2.7";
private static final Instrumenter<ChannelAndMethod, Void> channelInstrumenter =
createChannelInstrumenter();
createChannelInstrumenter(false);
private static final Instrumenter<ChannelAndMethod, Void> channelPublishInstrumenter =
createChannelInstrumenter(true);
private static final Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter =
createReceiveInstrumenter();
private static final Instrumenter<DeliveryRequest, Void> deliverInstrumenter =
@ -38,8 +40,11 @@ public final class RabbitSingletons {
static final ContextKey<RabbitChannelAndMethodHolder> CHANNEL_AND_METHOD_CONTEXT_KEY =
ContextKey.named("opentelemetry-rabbitmq-channel-and-method-context-key");
public static Instrumenter<ChannelAndMethod, Void> channelInstrumenter() {
return channelInstrumenter;
public static Instrumenter<ChannelAndMethod, Void> channelInstrumenter(
ChannelAndMethod channelAndMethod) {
return channelAndMethod.getMethod().equals("Channel.basicPublish")
? channelPublishInstrumenter
: channelInstrumenter;
}
public static Instrumenter<ReceiveRequest, GetResponse> receiveInstrumenter() {
@ -51,21 +56,19 @@ public final class RabbitSingletons {
}
@SuppressWarnings("deprecation") // have to use the deprecated Net*AttributesExtractor for now
private static Instrumenter<ChannelAndMethod, Void> createChannelInstrumenter() {
private static Instrumenter<ChannelAndMethod, Void> createChannelInstrumenter(boolean publish) {
return Instrumenter.<ChannelAndMethod, Void>builder(
GlobalOpenTelemetry.get(), instrumentationName, ChannelAndMethod::getMethod)
.addAttributesExtractor(
buildMessagingAttributesExtractor(
RabbitChannelAttributesGetter.INSTANCE, MessageOperation.PUBLISH))
RabbitChannelAttributesGetter.INSTANCE, publish ? MessageOperation.PUBLISH : null))
.addAttributesExtractor(
io.opentelemetry.instrumentation.api.instrumenter.net.NetClientAttributesExtractor
.create(new RabbitChannelNetAttributesGetter()))
.addContextCustomizer(
(context, request, startAttributes) ->
context.with(CHANNEL_AND_METHOD_CONTEXT_KEY, new RabbitChannelAndMethodHolder()))
.buildInstrumenter(
channelAndMethod ->
channelAndMethod.getMethod().equals("Channel.basicPublish") ? PRODUCER : CLIENT);
.buildInstrumenter(channelAndMethod -> publish ? PRODUCER : CLIENT);
}
@SuppressWarnings("deprecation") // have to use the deprecated Net*AttributesExtractor for now

View File

@ -180,6 +180,7 @@ public abstract class AbstractReactorKafkaTest {
asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),

View File

@ -106,6 +106,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
@ -160,6 +161,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String
@ -236,6 +238,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"messaging.rocketmq.broker_address" String
"messaging.rocketmq.send_result" "SEND_OK"
@ -320,6 +323,7 @@ abstract class AbstractRocketMqClientTest extends InstrumentationSpecification {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "rocketmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" sharedTopic
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_ID" String
"$SemanticAttributes.MESSAGING_ROCKETMQ_MESSAGE_TAG" "TagA"
"messaging.rocketmq.broker_address" String

View File

@ -121,7 +121,8 @@ public abstract class AbstractRocketMqClientSuppressReceiveSpanTest {
equalTo(
MESSAGING_MESSAGE_ID,
sendReceipt.getMessageId().toString()),
equalTo(MESSAGING_DESTINATION_NAME, topic)),
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "publish")),
span ->
span.hasKind(SpanKind.CONSUMER)
.hasName(topic + " process")

View File

@ -418,7 +418,8 @@ public abstract class AbstractRocketMqClientTest {
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(MESSAGING_DESTINATION_NAME, topic)));
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "publish")));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.PRODUCER)
@ -446,7 +447,8 @@ public abstract class AbstractRocketMqClientTest {
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(MESSAGING_DESTINATION_NAME, topic)));
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "publish")));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.PRODUCER)
@ -474,7 +476,8 @@ public abstract class AbstractRocketMqClientTest {
equalTo(MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES, (long) body.length),
equalTo(MESSAGING_SYSTEM, "rocketmq"),
equalTo(MESSAGING_MESSAGE_ID, sendReceipt.getMessageId().toString()),
equalTo(MESSAGING_DESTINATION_NAME, topic)));
equalTo(MESSAGING_DESTINATION_NAME, topic),
equalTo(MESSAGING_OPERATION, "publish")));
attributeAssertions.addAll(Arrays.asList(extraAttributes));
return span.hasKind(SpanKind.PRODUCER)

View File

@ -109,6 +109,7 @@ class KafkaIntegrationTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),

View File

@ -60,6 +60,7 @@ class SpringIntegrationAndRabbitTest extends AgentInstrumentationSpecification i
"$SemanticAttributes.NET_SOCK_FAMILY" { it == SemanticAttributes.NetSockFamilyValues.INET6 || it == null }
"$SemanticAttributes.MESSAGING_SYSTEM" "rabbitmq"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" "testTopic"
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
"$SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_SIZE_BYTES" Long
"$SemanticAttributes.MESSAGING_RABBITMQ_DESTINATION_ROUTING_KEY" String
}

View File

@ -54,6 +54,7 @@ class SpringListenerTest extends AgentInstrumentationSpecification {
attributes {
"$SemanticAttributes.MESSAGING_SYSTEM" "jms"
"$SemanticAttributes.MESSAGING_DESTINATION_NAME" destinationName
"$SemanticAttributes.MESSAGING_OPERATION" "publish"
if (destinationName == "(temporary)") {
"$SemanticAttributes.MESSAGING_DESTINATION_TEMPORARY" true
}

View File

@ -115,6 +115,7 @@ class SpringJmsListenerTest {
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
"spring-jms-listener"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
AbstractStringAssert::isNotBlank)),
@ -195,6 +196,7 @@ class SpringJmsListenerTest {
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
"spring-jms-listener"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_MESSAGE_ID,
AbstractStringAssert::isNotBlank),

View File

@ -68,6 +68,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
@ -156,6 +157,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testSingleTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
@ -242,6 +244,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
@ -259,6 +262,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
@ -337,6 +341,7 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
.hasAttributesSatisfyingExactly(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),

View File

@ -48,6 +48,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
"testSingleTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
@ -114,6 +115,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME,
"testSingleTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
@ -179,6 +181,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
@ -197,6 +200,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
@ -265,6 +269,7 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(
SemanticAttributes.MESSAGING_DESTINATION_NAME, "testBatchTopic"),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),

View File

@ -159,7 +159,8 @@ public class ContextPropagationTest {
.hasKind(SpanKind.PRODUCER)
.hasParent(trace.getSpan(0))
.hasAttributesSatisfyingExactly(
getAssertions("<default>", null, "127.0.0.1", true, testHeaders)),
getAssertions(
"<default>", "publish", "127.0.0.1", true, testHeaders)),
// spring-cloud-stream-binder-rabbit listener puts all messages into a
// BlockingQueue immediately after receiving
// that's why the rabbitmq CONSUMER span will never have any child span (and

View File

@ -201,6 +201,7 @@ public abstract class AbstractVertxKafkaTest {
Arrays.asList(
equalTo(SemanticAttributes.MESSAGING_SYSTEM, "kafka"),
equalTo(SemanticAttributes.MESSAGING_DESTINATION_NAME, record.topic()),
equalTo(SemanticAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
SemanticAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),