Separate suppression strategies for CONSUMER receive/process spans (#4076)
* Separate suppression strategies for CONSUMER receive/process spans * Update instrumentation-api/src/main/java/io/opentelemetry/instrumentation/api/instrumenter/SpanKeyExtractor.java Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com> * spotless Co-authored-by: Trask Stalnaker <trask.stalnaker@gmail.com>
This commit is contained in:
parent
dee98a0a9d
commit
05d1a4a54f
|
@ -16,13 +16,8 @@ import io.opentelemetry.context.propagation.TextMapGetter;
|
|||
import io.opentelemetry.context.propagation.TextMapSetter;
|
||||
import io.opentelemetry.instrumentation.api.annotations.UnstableApi;
|
||||
import io.opentelemetry.instrumentation.api.config.Config;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesExtractor;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
@ -146,25 +141,26 @@ public final class InstrumenterBuilder<REQUEST, RESPONSE> {
|
|||
/**
|
||||
* Enables suppression based on client instrumentation type.
|
||||
*
|
||||
* <p><strong>When enabled, suppresses nested spans depending on their {@link SpanKind} and
|
||||
* type</strong>.
|
||||
* <p><strong>When enabled, suppresses nested spans depending on their {@link SpanKind} and {@link
|
||||
* SpanKey}</strong>.
|
||||
*
|
||||
* <ul>
|
||||
* <li>CLIENT and PRODUCER nested spans are suppressed based on their type (HTTP, RPC, DB,
|
||||
* MESSAGING) i.e. if span with the same type is on the context, new span of this type will
|
||||
* not be started.
|
||||
* <li>CLIENT nested spans are suppressed based on their {@link SpanKey} (HTTP, RPC, DB) i.e. if
|
||||
* span marked with the same {@link SpanKey} is present in the parent context object, new
|
||||
* span of the same {@link SpanKey} will not be started.
|
||||
* </ul>
|
||||
*
|
||||
* <p><strong>When disabled:</strong>
|
||||
*
|
||||
* <ul>
|
||||
* <li>CLIENT and PRODUCER nested spans are always suppressed
|
||||
* <li>CLIENT nested spans are always suppressed
|
||||
* </ul>
|
||||
*
|
||||
* <p><strong>In both cases:</strong>
|
||||
*
|
||||
* <ul>
|
||||
* <li>SERVER and CONSUMER nested spans are always suppressed
|
||||
* <li>SERVER and PRODUCER nested spans are always suppressed
|
||||
* <li>CONSUMER spans are suppressed depending on their operation: receive or process
|
||||
* <li>INTERNAL spans are never suppressed
|
||||
* </ul>
|
||||
*/
|
||||
|
@ -244,31 +240,12 @@ public final class InstrumenterBuilder<REQUEST, RESPONSE> {
|
|||
}
|
||||
|
||||
SpanSuppressionStrategy getSpanSuppressionStrategy() {
|
||||
if (!enableSpanSuppressionByType) {
|
||||
// if not enabled, preserve current behavior, not distinguishing types
|
||||
return SpanSuppressionStrategy.SUPPRESS_ALL_NESTED_OUTGOING_STRATEGY;
|
||||
Set<SpanKey> spanKeys = SpanKeyExtractor.determineSpanKeys(attributesExtractors);
|
||||
if (enableSpanSuppressionByType) {
|
||||
return SpanSuppressionStrategy.from(spanKeys);
|
||||
}
|
||||
|
||||
Set<SpanKey> spanKeys = spanKeysFromAttributeExtractor(this.attributesExtractors);
|
||||
return SpanSuppressionStrategy.from(spanKeys);
|
||||
}
|
||||
|
||||
private static Set<SpanKey> spanKeysFromAttributeExtractor(
|
||||
List<? extends AttributesExtractor<?, ?>> attributesExtractors) {
|
||||
|
||||
Set<SpanKey> spanKeys = new HashSet<>();
|
||||
for (AttributesExtractor<?, ?> attributeExtractor : attributesExtractors) {
|
||||
if (attributeExtractor instanceof HttpAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.HTTP_CLIENT);
|
||||
} else if (attributeExtractor instanceof RpcAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.RPC_CLIENT);
|
||||
} else if (attributeExtractor instanceof DbAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.DB_CLIENT);
|
||||
} else if (attributeExtractor instanceof MessagingAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.MESSAGING_PRODUCER);
|
||||
}
|
||||
}
|
||||
return spanKeys;
|
||||
// if not enabled, preserve current behavior, not distinguishing CLIENT instrumentation types
|
||||
return SpanSuppressionStrategy.suppressNestedClients(spanKeys);
|
||||
}
|
||||
|
||||
private interface InstrumenterConstructor<RQ, RS> {
|
||||
|
|
|
@ -13,44 +13,51 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
/** Makes span keys for specific instrumentation accessible to enrich and suppress spans. */
|
||||
public final class SpanKey {
|
||||
|
||||
// server span key
|
||||
|
||||
private static final ContextKey<Span> SERVER_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-server");
|
||||
private static final ContextKey<Span> CONSUMER_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-consumer");
|
||||
|
||||
private static final ContextKey<Span> HTTP_KEY =
|
||||
// client span keys
|
||||
|
||||
private static final ContextKey<Span> HTTP_CLIENT_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-http");
|
||||
private static final ContextKey<Span> RPC_KEY =
|
||||
private static final ContextKey<Span> RPC_CLIENT_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-rpc");
|
||||
private static final ContextKey<Span> DB_KEY =
|
||||
private static final ContextKey<Span> DB_CLIENT_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-db");
|
||||
private static final ContextKey<Span> MESSAGING_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-messaging");
|
||||
|
||||
// this is used instead of above, depending on the configuration value for
|
||||
// otel.instrumentation.experimental.outgoing-span-suppression-by-type
|
||||
private static final ContextKey<Span> CLIENT_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-client");
|
||||
|
||||
// producer & consumer (messaging) span keys
|
||||
|
||||
private static final ContextKey<Span> PRODUCER_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-producer");
|
||||
private static final ContextKey<Span> CONSUMER_RECEIVE_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-consumer-receive");
|
||||
private static final ContextKey<Span> CONSUMER_PROCESS_KEY =
|
||||
ContextKey.named("opentelemetry-traces-span-key-consumer-process");
|
||||
|
||||
public static final SpanKey SERVER = new SpanKey(SERVER_KEY);
|
||||
public static final SpanKey CONSUMER = new SpanKey(CONSUMER_KEY);
|
||||
|
||||
static final SpanKey HTTP_CLIENT = new SpanKey(HTTP_KEY);
|
||||
static final SpanKey RPC_CLIENT = new SpanKey(RPC_KEY);
|
||||
static final SpanKey DB_CLIENT = new SpanKey(DB_KEY);
|
||||
static final SpanKey MESSAGING_PRODUCER = new SpanKey(MESSAGING_KEY);
|
||||
static final SpanKey HTTP_CLIENT = new SpanKey(HTTP_CLIENT_KEY);
|
||||
static final SpanKey RPC_CLIENT = new SpanKey(RPC_CLIENT_KEY);
|
||||
static final SpanKey DB_CLIENT = new SpanKey(DB_CLIENT_KEY);
|
||||
|
||||
// this is used instead of above, depending on the configuration value for
|
||||
// otel.instrumentation.experimental.outgoing-span-suppression-by-type
|
||||
public static final SpanKey ALL_CLIENTS = new SpanKey(CLIENT_KEY);
|
||||
public static final SpanKey ALL_PRODUCERS = new SpanKey(PRODUCER_KEY);
|
||||
|
||||
static final SpanKey PRODUCER = new SpanKey(PRODUCER_KEY);
|
||||
static final SpanKey CONSUMER_RECEIVE = new SpanKey(CONSUMER_RECEIVE_KEY);
|
||||
public static final SpanKey CONSUMER_PROCESS = new SpanKey(CONSUMER_PROCESS_KEY);
|
||||
|
||||
private final ContextKey<Span> key;
|
||||
|
||||
SpanKey(ContextKey<Span> key) {
|
||||
private SpanKey(ContextKey<Span> key) {
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.instrumenter;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesExtractor;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
final class SpanKeyExtractor {
|
||||
|
||||
/**
|
||||
* Automatically determines {@link SpanKey}s that should be applied to the newly constructed
|
||||
* {@link Instrumenter} based on the {@link AttributesExtractor}s configured.
|
||||
*/
|
||||
static Set<SpanKey> determineSpanKeys(
|
||||
List<? extends AttributesExtractor<?, ?>> attributesExtractors) {
|
||||
Set<SpanKey> spanKeys = new HashSet<>();
|
||||
for (AttributesExtractor<?, ?> attributeExtractor : attributesExtractors) {
|
||||
if (attributeExtractor instanceof HttpAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.HTTP_CLIENT);
|
||||
} else if (attributeExtractor instanceof RpcAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.RPC_CLIENT);
|
||||
} else if (attributeExtractor instanceof DbAttributesExtractor) {
|
||||
spanKeys.add(SpanKey.DB_CLIENT);
|
||||
} else if (attributeExtractor instanceof MessagingAttributesExtractor) {
|
||||
spanKeys.add(
|
||||
determineMessagingSpanKey((MessagingAttributesExtractor<?, ?>) attributeExtractor));
|
||||
}
|
||||
}
|
||||
return spanKeys;
|
||||
}
|
||||
|
||||
private static SpanKey determineMessagingSpanKey(
|
||||
MessagingAttributesExtractor<?, ?> messagingAttributesExtractor) {
|
||||
switch (messagingAttributesExtractor.operation()) {
|
||||
case SEND:
|
||||
return SpanKey.PRODUCER;
|
||||
case RECEIVE:
|
||||
return SpanKey.CONSUMER_RECEIVE;
|
||||
case PROCESS:
|
||||
return SpanKey.CONSUMER_PROCESS;
|
||||
}
|
||||
throw new IllegalStateException("Can't possibly happen");
|
||||
}
|
||||
|
||||
private SpanKeyExtractor() {}
|
||||
}
|
|
@ -15,33 +15,41 @@ import java.util.Set;
|
|||
abstract class SpanSuppressionStrategy {
|
||||
private static final SpanSuppressionStrategy SERVER_STRATEGY =
|
||||
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.SERVER));
|
||||
private static final SpanSuppressionStrategy CONSUMER_STRATEGY =
|
||||
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.CONSUMER));
|
||||
private static final SpanSuppressionStrategy ALL_CLIENTS_STRATEGY =
|
||||
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.ALL_CLIENTS));
|
||||
private static final SpanSuppressionStrategy ALL_PRODUCERS_STRATEGY =
|
||||
new SuppressIfSameSpanKeyStrategy(singleton(SpanKey.ALL_PRODUCERS));
|
||||
|
||||
public static final SpanSuppressionStrategy SUPPRESS_ALL_NESTED_OUTGOING_STRATEGY =
|
||||
private static final SpanSuppressionStrategy SUPPRESS_GENERIC_CLIENTS_AND_SERVERS =
|
||||
new CompositeSuppressionStrategy(
|
||||
ALL_CLIENTS_STRATEGY, ALL_PRODUCERS_STRATEGY, SERVER_STRATEGY, CONSUMER_STRATEGY);
|
||||
ALL_CLIENTS_STRATEGY,
|
||||
NoopSuppressionStrategy.INSTANCE,
|
||||
SERVER_STRATEGY,
|
||||
NoopSuppressionStrategy.INSTANCE);
|
||||
|
||||
private static final SpanSuppressionStrategy NO_CLIENT_SUPPRESSION_STRATEGY =
|
||||
private static final SpanSuppressionStrategy SUPPRESS_ONLY_SERVERS =
|
||||
new CompositeSuppressionStrategy(
|
||||
NoopSuppressionStrategy.INSTANCE,
|
||||
NoopSuppressionStrategy.INSTANCE,
|
||||
SERVER_STRATEGY,
|
||||
CONSUMER_STRATEGY);
|
||||
NoopSuppressionStrategy.INSTANCE);
|
||||
|
||||
static SpanSuppressionStrategy from(Set<SpanKey> clientSpanKeys) {
|
||||
if (clientSpanKeys.isEmpty()) {
|
||||
return NO_CLIENT_SUPPRESSION_STRATEGY;
|
||||
static SpanSuppressionStrategy suppressNestedClients(Set<SpanKey> spanKeys) {
|
||||
if (spanKeys.isEmpty()) {
|
||||
return SUPPRESS_GENERIC_CLIENTS_AND_SERVERS;
|
||||
}
|
||||
|
||||
SpanSuppressionStrategy clientOrProducerStrategy =
|
||||
new SuppressIfSameSpanKeyStrategy(clientSpanKeys);
|
||||
SpanSuppressionStrategy spanKeyStrategy = new SuppressIfSameSpanKeyStrategy(spanKeys);
|
||||
return new CompositeSuppressionStrategy(
|
||||
clientOrProducerStrategy, clientOrProducerStrategy, SERVER_STRATEGY, CONSUMER_STRATEGY);
|
||||
ALL_CLIENTS_STRATEGY, spanKeyStrategy, SERVER_STRATEGY, spanKeyStrategy);
|
||||
}
|
||||
|
||||
static SpanSuppressionStrategy from(Set<SpanKey> spanKeys) {
|
||||
if (spanKeys.isEmpty()) {
|
||||
return SUPPRESS_ONLY_SERVERS;
|
||||
}
|
||||
|
||||
SpanSuppressionStrategy spanKeyStrategy = new SuppressIfSameSpanKeyStrategy(spanKeys);
|
||||
return new CompositeSuppressionStrategy(
|
||||
spanKeyStrategy, spanKeyStrategy, SERVER_STRATEGY, spanKeyStrategy);
|
||||
}
|
||||
|
||||
abstract Context storeInContext(Context context, SpanKind spanKind, Span span);
|
||||
|
|
|
@ -1,33 +0,0 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.instrumenter;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import java.util.Set;
|
||||
|
||||
final class StoreOnlyStrategy extends SpanSuppressionStrategy {
|
||||
|
||||
private final Set<SpanKey> outgoingSpanKeys;
|
||||
|
||||
StoreOnlyStrategy(Set<SpanKey> outgoingSpanKeys) {
|
||||
this.outgoingSpanKeys = outgoingSpanKeys;
|
||||
}
|
||||
|
||||
@Override
|
||||
Context storeInContext(Context context, SpanKind spanKind, Span span) {
|
||||
for (SpanKey outgoingSpanKey : outgoingSpanKeys) {
|
||||
context = outgoingSpanKey.storeInContext(context, span);
|
||||
}
|
||||
return context;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean shouldSuppress(Context parentContext, SpanKind spanKind) {
|
||||
return false;
|
||||
}
|
||||
}
|
|
@ -46,7 +46,7 @@ public abstract class MessagingAttributesExtractor<REQUEST, RESPONSE>
|
|||
attributes,
|
||||
SemanticAttributes.MESSAGING_MESSAGE_PAYLOAD_COMPRESSED_SIZE_BYTES,
|
||||
messagePayloadCompressedSize(request));
|
||||
MessageOperation operation = operation(request);
|
||||
MessageOperation operation = operation();
|
||||
if (operation == MessageOperation.RECEIVE || operation == MessageOperation.PROCESS) {
|
||||
set(attributes, SemanticAttributes.MESSAGING_OPERATION, operation.operationName());
|
||||
}
|
||||
|
@ -61,6 +61,8 @@ public abstract class MessagingAttributesExtractor<REQUEST, RESPONSE>
|
|||
set(attributes, SemanticAttributes.MESSAGING_MESSAGE_ID, messageId(request, response));
|
||||
}
|
||||
|
||||
public abstract MessageOperation operation();
|
||||
|
||||
@Nullable
|
||||
protected abstract String system(REQUEST request);
|
||||
|
||||
|
@ -90,9 +92,6 @@ public abstract class MessagingAttributesExtractor<REQUEST, RESPONSE>
|
|||
@Nullable
|
||||
protected abstract Long messagePayloadCompressedSize(REQUEST request);
|
||||
|
||||
@Nullable
|
||||
protected abstract MessageOperation operation(REQUEST request);
|
||||
|
||||
@Nullable
|
||||
protected abstract String messageId(REQUEST request, @Nullable RESPONSE response);
|
||||
}
|
||||
|
|
|
@ -16,7 +16,7 @@ public final class MessagingSpanNameExtractor<REQUEST> implements SpanNameExtrac
|
|||
*
|
||||
* @see MessagingAttributesExtractor#destination(Object) used to extract {@code <destination
|
||||
* name>}.
|
||||
* @see MessagingAttributesExtractor#operation(Object) used to extract {@code <operation name>}.
|
||||
* @see MessagingAttributesExtractor#operation() used to extract {@code <operation name>}.
|
||||
*/
|
||||
public static <REQUEST> SpanNameExtractor<REQUEST> create(
|
||||
MessagingAttributesExtractor<REQUEST, ?> attributesExtractor) {
|
||||
|
@ -39,7 +39,7 @@ public final class MessagingSpanNameExtractor<REQUEST> implements SpanNameExtrac
|
|||
destinationName = "unknown";
|
||||
}
|
||||
|
||||
MessageOperation operation = attributesExtractor.operation(request);
|
||||
return operation == null ? destinationName : destinationName + " " + operation.operationName();
|
||||
MessageOperation operation = attributesExtractor.operation();
|
||||
return destinationName + " " + operation.operationName();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,11 +30,11 @@ public final class ConsumerSpan {
|
|||
*/
|
||||
@Nullable
|
||||
public static Span fromContextOrNull(Context context) {
|
||||
return SpanKey.CONSUMER.fromContextOrNull(context);
|
||||
return SpanKey.CONSUMER_PROCESS.fromContextOrNull(context);
|
||||
}
|
||||
|
||||
public static Context with(Context context, Span consumerSpan) {
|
||||
return SpanKey.CONSUMER.storeInContext(context, consumerSpan);
|
||||
return SpanKey.CONSUMER_PROCESS.storeInContext(context, consumerSpan);
|
||||
}
|
||||
|
||||
private ConsumerSpan() {}
|
||||
|
|
|
@ -23,6 +23,7 @@ import io.opentelemetry.context.Context;
|
|||
import io.opentelemetry.context.propagation.TextMapGetter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.net.NetAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesExtractor;
|
||||
|
@ -787,7 +788,7 @@ class InstrumenterTest {
|
|||
Instrumenter<Map<String, String>, Map<String, String>> instrumenterOuter =
|
||||
getInstrumenterWithType(true);
|
||||
Instrumenter<Map<String, String>, Map<String, String>> instrumenterInner =
|
||||
getInstrumenterWithType(true, new AttributesExtractor[] {null});
|
||||
getInstrumenterWithType(true, null, null);
|
||||
|
||||
Map<String, String> request = new HashMap<>(REQUEST);
|
||||
|
||||
|
@ -829,18 +830,22 @@ class InstrumenterTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
void instrumentationTypeDetected_messaging() {
|
||||
void instrumentationTypeDetected_producer() {
|
||||
when(mockMessagingAttributes.operation()).thenReturn(MessageOperation.SEND);
|
||||
|
||||
Instrumenter<Map<String, String>, Map<String, String>> instrumenter =
|
||||
getInstrumenterWithType(true, mockMessagingAttributes);
|
||||
|
||||
Map<String, String> request = new HashMap<>(REQUEST);
|
||||
|
||||
Context context = instrumenter.start(Context.root(), request);
|
||||
validateInstrumentationTypeSpanPresent(SpanKey.MESSAGING_PRODUCER, context);
|
||||
validateInstrumentationTypeSpanPresent(SpanKey.PRODUCER, context);
|
||||
}
|
||||
|
||||
@Test
|
||||
void instrumentationTypeDetected_mix() {
|
||||
when(mockMessagingAttributes.operation()).thenReturn(MessageOperation.SEND);
|
||||
|
||||
Instrumenter<Map<String, String>, Map<String, String>> instrumenter =
|
||||
getInstrumenterWithType(
|
||||
true,
|
||||
|
@ -852,7 +857,7 @@ class InstrumenterTest {
|
|||
Map<String, String> request = new HashMap<>(REQUEST);
|
||||
|
||||
Context context = instrumenter.start(Context.root(), request);
|
||||
validateInstrumentationTypeSpanPresent(SpanKey.MESSAGING_PRODUCER, context);
|
||||
validateInstrumentationTypeSpanPresent(SpanKey.PRODUCER, context);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -870,7 +875,7 @@ class InstrumenterTest {
|
|||
assertThat(SpanKey.HTTP_CLIENT.fromContextOrNull(context)).isNull();
|
||||
assertThat(SpanKey.DB_CLIENT.fromContextOrNull(context)).isNull();
|
||||
assertThat(SpanKey.RPC_CLIENT.fromContextOrNull(context)).isNull();
|
||||
assertThat(SpanKey.MESSAGING_PRODUCER.fromContextOrNull(context)).isNull();
|
||||
assertThat(SpanKey.PRODUCER.fromContextOrNull(context)).isNull();
|
||||
}
|
||||
|
||||
private static void validateInstrumentationTypeSpanPresent(SpanKey spanKey, Context context) {
|
||||
|
@ -880,8 +885,10 @@ class InstrumenterTest {
|
|||
assertThat(spanKey.fromContextOrNull(context)).isSameAs(span);
|
||||
}
|
||||
|
||||
@SafeVarargs
|
||||
private static Instrumenter<Map<String, String>, Map<String, String>> getInstrumenterWithType(
|
||||
boolean enableInstrumentation, AttributesExtractor... attributeExtractors) {
|
||||
boolean enableInstrumentation,
|
||||
AttributesExtractor<Map<String, String>, Map<String, String>>... attributeExtractors) {
|
||||
InstrumenterBuilder<Map<String, String>, Map<String, String>> builder =
|
||||
Instrumenter.<Map<String, String>, Map<String, String>>newBuilder(
|
||||
otelTesting.getOpenTelemetry(), "test", unused -> "span")
|
||||
|
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.api.instrumenter;
|
||||
|
||||
import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.singletonList;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.db.DbAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.http.HttpAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.rpc.RpcAttributesExtractor;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.api.extension.ExtensionContext;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.Arguments;
|
||||
import org.junit.jupiter.params.provider.ArgumentsProvider;
|
||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||
|
||||
class SpanKeyExtractorTest {
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(ClientSpanKeys.class)
|
||||
void shouldDetermineKeysForClientAttributesExtractors(
|
||||
AttributesExtractor<?, ?> attributesExtractor, SpanKey expectedSpanKey) {
|
||||
|
||||
Set<SpanKey> spanKeys = SpanKeyExtractor.determineSpanKeys(singletonList(attributesExtractor));
|
||||
assertEquals(singleton(expectedSpanKey), spanKeys);
|
||||
}
|
||||
|
||||
static final class ClientSpanKeys implements ArgumentsProvider {
|
||||
|
||||
@Override
|
||||
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
|
||||
return Stream.of(
|
||||
Arguments.of(mock(DbAttributesExtractor.class), SpanKey.DB_CLIENT),
|
||||
Arguments.of(mock(HttpAttributesExtractor.class), SpanKey.HTTP_CLIENT),
|
||||
Arguments.of(mock(RpcAttributesExtractor.class), SpanKey.RPC_CLIENT));
|
||||
}
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@ArgumentsSource(MessagingSpanKeys.class)
|
||||
void shouldDetermineKeysForMessagingAttributesExtractor(
|
||||
MessageOperation operation, SpanKey expectedSpanKey) {
|
||||
|
||||
MessagingAttributesExtractor<?, ?> attributesExtractor =
|
||||
mock(MessagingAttributesExtractor.class);
|
||||
when(attributesExtractor.operation()).thenReturn(operation);
|
||||
|
||||
Set<SpanKey> spanKeys = SpanKeyExtractor.determineSpanKeys(singletonList(attributesExtractor));
|
||||
|
||||
assertEquals(singleton(expectedSpanKey), spanKeys);
|
||||
}
|
||||
|
||||
static final class MessagingSpanKeys implements ArgumentsProvider {
|
||||
|
||||
@Override
|
||||
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
|
||||
return Stream.of(
|
||||
Arguments.of(MessageOperation.PROCESS, SpanKey.CONSUMER_PROCESS),
|
||||
Arguments.of(MessageOperation.RECEIVE, SpanKey.CONSUMER_RECEIVE),
|
||||
Arguments.of(MessageOperation.SEND, SpanKey.PRODUCER));
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,13 +5,13 @@
|
|||
|
||||
package io.opentelemetry.instrumentation.api.instrumenter;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.Collections.singleton;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
import io.opentelemetry.api.trace.Span;
|
||||
import io.opentelemetry.api.trace.SpanKind;
|
||||
import io.opentelemetry.context.Context;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
@ -23,193 +23,139 @@ public class SpanSuppressionStrategyTest {
|
|||
private static final Span SPAN = Span.getInvalid();
|
||||
|
||||
@Test
|
||||
public void serverSpan_getSet() {
|
||||
assertThat(SpanKey.SERVER.fromContextOrNull(Context.root())).isNull();
|
||||
|
||||
Context context = SpanKey.SERVER.storeInContext(Context.root(), SPAN);
|
||||
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.from(Collections.singleton(SpanKey.SERVER));
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isTrue();
|
||||
|
||||
assertThat(SpanKey.SERVER.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
allClientSpanKeys().forEach(spanKey -> assertThat(spanKey.fromContextOrNull(context)).isNull());
|
||||
|
||||
assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void serverSpan_getSetWithStrategy() {
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.from(Collections.singleton(SpanKey.SERVER));
|
||||
assertThat(strategy.shouldSuppress(Context.root(), SpanKind.SERVER)).isFalse();
|
||||
assertThat(SpanKey.SERVER.fromContextOrNull(Context.root())).isNull();
|
||||
public void serverSpan() {
|
||||
// SpanKey.SERVER will never be passed to SpanSuppressionStrategy.from(), it cannot be
|
||||
// automatically determined by te builder - thus it does not make any sense to test it (for now)
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(emptySet());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.SERVER, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isTrue();
|
||||
assertThat(SpanKey.SERVER.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
Stream.of(SpanKind.CLIENT, SpanKind.CONSUMER, SpanKind.PRODUCER)
|
||||
.forEach(spanKind -> assertThat(strategy.shouldSuppress(context, spanKind)).isFalse());
|
||||
|
||||
allClientSpanKeys().forEach(spanKey -> assertThat(spanKey.fromContextOrNull(context)).isNull());
|
||||
|
||||
assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void consumerSpan_getSet() {
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.from(Collections.singleton(SpanKey.CONSUMER));
|
||||
|
||||
assertThat(strategy.shouldSuppress(Context.root(), SpanKind.CONSUMER)).isFalse();
|
||||
assertThat(SpanKey.CONSUMER.fromContextOrNull(Context.root())).isNull();
|
||||
|
||||
Context context = SpanKey.CONSUMER.storeInContext(Context.root(), SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue();
|
||||
|
||||
assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
allClientSpanKeys().forEach(spanKey -> assertThat(spanKey.fromContextOrNull(context)).isNull());
|
||||
|
||||
assertThat(SpanKey.SERVER.fromContextOrNull(context)).isNull();
|
||||
verifySpanKey(SpanKey.SERVER, context);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allClientSpanKeys")
|
||||
public void clientSpan_differentForAllTypes(SpanKey spanKey) {
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(Collections.singleton(spanKey));
|
||||
assertThat(strategy.shouldSuppress(Context.root(), SpanKind.CLIENT)).isFalse();
|
||||
@MethodSource("consumerSpanKeys")
|
||||
public void consumerSpan(SpanKey spanKey) {
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(singleton(spanKey));
|
||||
|
||||
Context context = spanKey.storeInContext(Context.root(), SPAN);
|
||||
verifyNoSuppression(strategy, Context.root());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CONSUMER, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isFalse();
|
||||
Stream.of(SpanKind.CLIENT, SpanKind.CONSUMER, SpanKind.PRODUCER)
|
||||
.forEach(spanKind -> assertThat(strategy.shouldSuppress(context, spanKind)).isTrue());
|
||||
|
||||
verifySpanKey(spanKey, context);
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("clientSpanKeys")
|
||||
public void clientSpan(SpanKey spanKey) {
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(singleton(spanKey));
|
||||
|
||||
verifyNoSuppression(strategy, Context.root());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CLIENT, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isFalse();
|
||||
Stream.of(SpanKind.CLIENT, SpanKind.CONSUMER, SpanKind.PRODUCER)
|
||||
.forEach(spanKind -> assertThat(strategy.shouldSuppress(context, spanKind)).isTrue());
|
||||
|
||||
verifySpanKey(spanKey, context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void producerSpan() {
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(singleton(SpanKey.PRODUCER));
|
||||
|
||||
verifyNoSuppression(strategy, Context.root());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.PRODUCER, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isFalse();
|
||||
Stream.of(SpanKind.CLIENT, SpanKind.CONSUMER, SpanKind.PRODUCER)
|
||||
.forEach(spanKind -> assertThat(strategy.shouldSuppress(context, spanKind)).isTrue());
|
||||
|
||||
verifySpanKey(SpanKey.PRODUCER, context);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multipleClientKeys() {
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.from(clientSpanKeys().collect(Collectors.toSet()));
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CLIENT, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CLIENT)).isTrue();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.PRODUCER)).isTrue();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue();
|
||||
|
||||
clientSpanKeys().forEach(key -> assertThat(key.fromContextOrNull(context)).isSameAs(SPAN));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("nonServerSpanKinds")
|
||||
public void noKeys_nonServerSpanKindsAreNotSuppressed(SpanKind spanKind) {
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(emptySet());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), spanKind, SPAN);
|
||||
|
||||
assertThat(context).isSameAs(Context.root());
|
||||
verifyNoSuppression(strategy, context);
|
||||
|
||||
allSpanKeys().forEach(key -> assertThat(key.fromContextOrNull(context)).isNull());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void nestedClientsDisabled_useAllClientsSpanKey() {
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.suppressNestedClients(allSpanKeys().collect(Collectors.toSet()));
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CLIENT, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CLIENT)).isTrue();
|
||||
|
||||
assertThat(SpanKey.ALL_CLIENTS.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
allSpanKeys()
|
||||
.filter(key -> key != SpanKey.ALL_CLIENTS)
|
||||
.forEach(key -> assertThat(key.fromContextOrNull(context)).isNull());
|
||||
}
|
||||
|
||||
@SuppressWarnings("unused")
|
||||
private static Stream<SpanKind> nonServerSpanKinds() {
|
||||
return Stream.of(SpanKind.CONSUMER, SpanKind.CLIENT, SpanKind.PRODUCER);
|
||||
}
|
||||
|
||||
private static void verifyNoSuppression(SpanSuppressionStrategy strategy, Context context) {
|
||||
Stream.of(SpanKind.values())
|
||||
.forEach(spanKind -> assertThat(strategy.shouldSuppress(context, spanKind)).isFalse());
|
||||
}
|
||||
|
||||
private static void verifySpanKey(SpanKey spanKey, Context context) {
|
||||
assertThat(spanKey.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
|
||||
allClientSpanKeys()
|
||||
allSpanKeys()
|
||||
.filter(key -> key != spanKey)
|
||||
.forEach(key -> assertThat(key.fromContextOrNull(context)).isNull());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("allClientSpanKeys")
|
||||
public void client_sameAsProducer(SpanKey spanKey) {
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(Collections.singleton(spanKey));
|
||||
|
||||
Context context = spanKey.storeInContext(Context.root(), SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CLIENT)).isTrue();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.PRODUCER)).isTrue();
|
||||
|
||||
allClientSpanKeys()
|
||||
.forEach(
|
||||
anotherKey -> {
|
||||
if (spanKey != anotherKey) {
|
||||
SpanSuppressionStrategy anotherStrategy =
|
||||
SpanSuppressionStrategy.from(Collections.singleton(anotherKey));
|
||||
assertThat(anotherStrategy.shouldSuppress(context, SpanKind.CLIENT)).isFalse();
|
||||
assertThat(anotherStrategy.shouldSuppress(context, SpanKind.PRODUCER)).isFalse();
|
||||
}
|
||||
});
|
||||
private static Stream<SpanKey> allSpanKeys() {
|
||||
return Stream.concat(
|
||||
Stream.of(SpanKey.PRODUCER, SpanKey.SERVER),
|
||||
Stream.concat(consumerSpanKeys(), clientSpanKeys()));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void allNestedOutgoing_producerDoesNotSuppressClient() {
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.SUPPRESS_ALL_NESTED_OUTGOING_STRATEGY;
|
||||
|
||||
Context contextClient = SpanKey.ALL_CLIENTS.storeInContext(Context.root(), SPAN);
|
||||
Context contextProducer = SpanKey.ALL_PRODUCERS.storeInContext(Context.root(), SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(contextClient, SpanKind.CLIENT)).isTrue();
|
||||
assertThat(strategy.shouldSuppress(contextClient, SpanKind.PRODUCER)).isFalse();
|
||||
|
||||
assertThat(strategy.shouldSuppress(contextProducer, SpanKind.CLIENT)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(contextProducer, SpanKind.PRODUCER)).isTrue();
|
||||
private static Stream<SpanKey> consumerSpanKeys() {
|
||||
return Stream.of(SpanKey.CONSUMER_RECEIVE, SpanKey.CONSUMER_PROCESS);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void multipleKeys() {
|
||||
|
||||
SpanSuppressionStrategy strategy =
|
||||
SpanSuppressionStrategy.from(allClientSpanKeys().collect(Collectors.toSet()));
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CLIENT, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CLIENT)).isTrue();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.PRODUCER)).isTrue();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse();
|
||||
|
||||
allClientSpanKeys()
|
||||
.forEach(
|
||||
key -> {
|
||||
assertThat(key.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noKeys_clientIsNeverSuppressed() {
|
||||
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(new HashSet<>());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CLIENT, SPAN);
|
||||
assertThat(context).isSameAs(Context.root());
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CLIENT)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.PRODUCER)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isFalse();
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isFalse();
|
||||
|
||||
allClientSpanKeys()
|
||||
.forEach(
|
||||
key -> {
|
||||
assertThat(key.fromContextOrNull(context)).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noKeys_serverIsSuppressed() {
|
||||
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(new HashSet<>());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.SERVER, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.SERVER)).isTrue();
|
||||
assertThat(SpanKey.SERVER.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
|
||||
allClientSpanKeys()
|
||||
.forEach(
|
||||
key -> {
|
||||
assertThat(key.fromContextOrNull(context)).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
@Test
|
||||
public void noKeys_consumerIsSuppressed() {
|
||||
|
||||
SpanSuppressionStrategy strategy = SpanSuppressionStrategy.from(new HashSet<>());
|
||||
|
||||
Context context = strategy.storeInContext(Context.root(), SpanKind.CONSUMER, SPAN);
|
||||
|
||||
assertThat(strategy.shouldSuppress(context, SpanKind.CONSUMER)).isTrue();
|
||||
assertThat(SpanKey.CONSUMER.fromContextOrNull(context)).isSameAs(SPAN);
|
||||
|
||||
allClientSpanKeys()
|
||||
.forEach(
|
||||
key -> {
|
||||
assertThat(key.fromContextOrNull(context)).isNull();
|
||||
});
|
||||
}
|
||||
|
||||
private static Stream<SpanKey> allClientSpanKeys() {
|
||||
private static Stream<SpanKey> clientSpanKeys() {
|
||||
return Stream.of(
|
||||
SpanKey.ALL_CLIENTS,
|
||||
SpanKey.ALL_PRODUCERS,
|
||||
SpanKey.HTTP_CLIENT,
|
||||
SpanKey.DB_CLIENT,
|
||||
SpanKey.RPC_CLIENT,
|
||||
SpanKey.MESSAGING_PRODUCER);
|
||||
SpanKey.ALL_CLIENTS, SpanKey.HTTP_CLIENT, SpanKey.DB_CLIENT, SpanKey.RPC_CLIENT);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package io.opentelemetry.instrumentation.api.instrumenter.messaging;
|
||||
|
||||
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.assertThat;
|
||||
import static java.util.Collections.singletonMap;
|
||||
import static org.assertj.core.api.Assertions.entry;
|
||||
|
||||
import io.opentelemetry.api.common.AttributeKey;
|
||||
|
@ -26,71 +25,6 @@ import org.junit.jupiter.params.provider.Arguments;
|
|||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
class MessagingAttributesExtractorTest {
|
||||
static final MessagingAttributesExtractor<Map<String, String>, String> underTest =
|
||||
new MessagingAttributesExtractor<Map<String, String>, String>() {
|
||||
@Override
|
||||
protected String system(Map<String, String> request) {
|
||||
return request.get("system");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String destinationKind(Map<String, String> request) {
|
||||
return request.get("destinationKind");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String destination(Map<String, String> request) {
|
||||
return request.get("destination");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean temporaryDestination(Map<String, String> request) {
|
||||
return request.containsKey("temporaryDestination");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String protocol(Map<String, String> request) {
|
||||
return request.get("protocol");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String protocolVersion(Map<String, String> request) {
|
||||
return request.get("protocolVersion");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String url(Map<String, String> request) {
|
||||
return request.get("url");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String conversationId(Map<String, String> request) {
|
||||
return request.get("conversationId");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long messagePayloadSize(Map<String, String> request) {
|
||||
String payloadSize = request.get("payloadSize");
|
||||
return payloadSize == null ? null : Long.valueOf(payloadSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long messagePayloadCompressedSize(Map<String, String> request) {
|
||||
String payloadSize = request.get("payloadCompressedSize");
|
||||
return payloadSize == null ? null : Long.valueOf(payloadSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(Map<String, String> request) {
|
||||
String operation = request.get("operation");
|
||||
return operation == null ? null : MessageOperation.valueOf(operation);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String messageId(Map<String, String> request, String response) {
|
||||
return response;
|
||||
}
|
||||
};
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("destinations")
|
||||
|
@ -113,7 +47,8 @@ class MessagingAttributesExtractorTest {
|
|||
request.put("conversationId", "42");
|
||||
request.put("payloadSize", "100");
|
||||
request.put("payloadCompressedSize", "10");
|
||||
request.put("operation", operation.name());
|
||||
|
||||
TestMessagingAttributesExtractor underTest = new TestMessagingAttributesExtractor(operation);
|
||||
|
||||
// when
|
||||
AttributesBuilder startAttributes = Attributes.builder();
|
||||
|
@ -151,18 +86,12 @@ class MessagingAttributesExtractorTest {
|
|||
Arguments.of(true, null, MessageOperation.PROCESS, "(temporary)"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldNotSetSendOperation() {
|
||||
// when
|
||||
AttributesBuilder attributes = Attributes.builder();
|
||||
underTest.onStart(attributes, singletonMap("operation", MessageOperation.SEND.name()));
|
||||
|
||||
// then
|
||||
assertThat(attributes.build().isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
@Test
|
||||
void shouldExtractNoAttributesIfNoneAreAvailable() {
|
||||
// given
|
||||
TestMessagingAttributesExtractor underTest =
|
||||
new TestMessagingAttributesExtractor(MessageOperation.SEND);
|
||||
|
||||
// when
|
||||
AttributesBuilder startAttributes = Attributes.builder();
|
||||
underTest.onStart(startAttributes, Collections.emptyMap());
|
||||
|
@ -175,4 +104,76 @@ class MessagingAttributesExtractorTest {
|
|||
|
||||
assertThat(endAttributes.build().isEmpty()).isTrue();
|
||||
}
|
||||
|
||||
static class TestMessagingAttributesExtractor
|
||||
extends MessagingAttributesExtractor<Map<String, String>, String> {
|
||||
|
||||
private final MessageOperation operation;
|
||||
|
||||
TestMessagingAttributesExtractor(MessageOperation operation) {
|
||||
this.operation = operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String system(Map<String, String> request) {
|
||||
return request.get("system");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String destinationKind(Map<String, String> request) {
|
||||
return request.get("destinationKind");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String destination(Map<String, String> request) {
|
||||
return request.get("destination");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean temporaryDestination(Map<String, String> request) {
|
||||
return request.containsKey("temporaryDestination");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String protocol(Map<String, String> request) {
|
||||
return request.get("protocol");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String protocolVersion(Map<String, String> request) {
|
||||
return request.get("protocolVersion");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String url(Map<String, String> request) {
|
||||
return request.get("url");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String conversationId(Map<String, String> request) {
|
||||
return request.get("conversationId");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long messagePayloadSize(Map<String, String> request) {
|
||||
String payloadSize = request.get("payloadSize");
|
||||
return payloadSize == null ? null : Long.valueOf(payloadSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected Long messagePayloadCompressedSize(Map<String, String> request) {
|
||||
String payloadSize = request.get("payloadCompressedSize");
|
||||
return payloadSize == null ? null : Long.valueOf(payloadSize);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String messageId(Map<String, String> request, String response) {
|
||||
return response;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ class MessagingSpanNameExtractorTest {
|
|||
} else {
|
||||
given(attributesExtractor.destination(message)).willReturn(destinationName);
|
||||
}
|
||||
given(attributesExtractor.operation(message)).willReturn(operation);
|
||||
given(attributesExtractor.operation()).willReturn(operation);
|
||||
|
||||
SpanNameExtractor<Message> underTest = MessagingSpanNameExtractor.create(attributesExtractor);
|
||||
|
||||
|
@ -51,8 +51,7 @@ class MessagingSpanNameExtractorTest {
|
|||
return Stream.of(
|
||||
Arguments.of(false, "destination", MessageOperation.SEND, "destination send"),
|
||||
Arguments.of(true, null, MessageOperation.PROCESS, "(temporary) process"),
|
||||
Arguments.of(false, null, MessageOperation.RECEIVE, "unknown receive"),
|
||||
Arguments.of(false, "destination", null, "destination"));
|
||||
Arguments.of(false, null, MessageOperation.RECEIVE, "unknown receive"));
|
||||
}
|
||||
|
||||
static class Message {}
|
||||
|
|
|
@ -52,8 +52,7 @@ class MessageWithDestinationTest {
|
|||
given(message.getJMSDestination()).willReturn(destination);
|
||||
|
||||
// when
|
||||
MessageWithDestination result =
|
||||
MessageWithDestination.create(message, MessageOperation.SEND, null, timer);
|
||||
MessageWithDestination result = MessageWithDestination.create(message, null, timer);
|
||||
|
||||
// then
|
||||
assertMessage(
|
||||
|
@ -66,8 +65,7 @@ class MessageWithDestinationTest {
|
|||
given(message.getJMSDestination()).willThrow(JMSException.class);
|
||||
|
||||
// when
|
||||
MessageWithDestination result =
|
||||
MessageWithDestination.create(message, MessageOperation.SEND, destination, timer);
|
||||
MessageWithDestination result = MessageWithDestination.create(message, destination, timer);
|
||||
|
||||
// then
|
||||
assertMessage(
|
||||
|
@ -93,8 +91,7 @@ class MessageWithDestinationTest {
|
|||
}
|
||||
|
||||
// when
|
||||
MessageWithDestination result =
|
||||
MessageWithDestination.create(message, MessageOperation.RECEIVE, null, timer);
|
||||
MessageWithDestination result = MessageWithDestination.create(message, null, timer);
|
||||
|
||||
// then
|
||||
assertMessage(
|
||||
|
@ -120,8 +117,7 @@ class MessageWithDestinationTest {
|
|||
}
|
||||
|
||||
// when
|
||||
MessageWithDestination result =
|
||||
MessageWithDestination.create(message, MessageOperation.RECEIVE, null, timer);
|
||||
MessageWithDestination result = MessageWithDestination.create(message, null, timer);
|
||||
|
||||
// then
|
||||
assertMessage(
|
||||
|
@ -144,7 +140,6 @@ class MessageWithDestinationTest {
|
|||
MessageWithDestination actual) {
|
||||
|
||||
assertSame(message, actual.message());
|
||||
assertSame(expectedMessageOperation, actual.messageOperation());
|
||||
assertEquals(expectedDestinationKind, actual.destinationKind());
|
||||
assertEquals(expectedDestinationName, actual.destinationName());
|
||||
assertEquals(expectedTemporary, actual.isTemporaryDestination());
|
||||
|
|
|
@ -16,6 +16,17 @@ public class JmsMessageAttributesExtractor
|
|||
extends MessagingAttributesExtractor<MessageWithDestination, Void> {
|
||||
private static final Logger logger = LoggerFactory.getLogger(JmsMessageAttributesExtractor.class);
|
||||
|
||||
private final MessageOperation operation;
|
||||
|
||||
public JmsMessageAttributesExtractor(MessageOperation operation) {
|
||||
this.operation = operation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return operation;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected String system(MessageWithDestination messageWithDestination) {
|
||||
|
@ -80,11 +91,6 @@ public class JmsMessageAttributesExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(MessageWithDestination messageWithDestination) {
|
||||
return messageWithDestination.messageOperation();
|
||||
}
|
||||
|
||||
@Nullable
|
||||
@Override
|
||||
protected String messageId(MessageWithDestination messageWithDestination, Void unused) {
|
||||
|
|
|
@ -13,7 +13,6 @@ import static net.bytebuddy.matcher.ElementMatchers.named;
|
|||
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
|
||||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
|
@ -63,8 +62,7 @@ public class JmsMessageConsumerInstrumentation implements TypeInstrumentation {
|
|||
}
|
||||
|
||||
Context parentContext = Java8BytecodeBridge.currentContext();
|
||||
MessageWithDestination request =
|
||||
MessageWithDestination.create(message, MessageOperation.RECEIVE, null, timer);
|
||||
MessageWithDestination request = MessageWithDestination.create(message, null, timer);
|
||||
|
||||
if (consumerInstrumenter().shouldStart(parentContext, request)) {
|
||||
Context context = consumerInstrumenter().start(parentContext, request);
|
||||
|
|
|
@ -14,7 +14,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
|||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge;
|
||||
|
@ -53,7 +52,7 @@ public class JmsMessageListenerInstrumentation implements TypeInstrumentation {
|
|||
@Advice.Local("otelScope") Scope scope) {
|
||||
|
||||
Context parentContext = Java8BytecodeBridge.currentContext();
|
||||
request = MessageWithDestination.create(message, MessageOperation.PROCESS, null);
|
||||
request = MessageWithDestination.create(message, null);
|
||||
|
||||
if (!listenerInstrumenter().shouldStart(parentContext, request)) {
|
||||
return;
|
||||
|
|
|
@ -14,7 +14,6 @@ import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
|
|||
|
||||
import io.opentelemetry.context.Context;
|
||||
import io.opentelemetry.context.Scope;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
|
||||
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
|
||||
import io.opentelemetry.javaagent.instrumentation.api.CallDepth;
|
||||
|
@ -76,7 +75,7 @@ public class JmsMessageProducerInstrumentation implements TypeInstrumentation {
|
|||
}
|
||||
|
||||
Context parentContext = Java8BytecodeBridge.currentContext();
|
||||
request = MessageWithDestination.create(message, MessageOperation.SEND, defaultDestination);
|
||||
request = MessageWithDestination.create(message, defaultDestination);
|
||||
if (!producerInstrumenter().shouldStart(parentContext, request)) {
|
||||
return;
|
||||
}
|
||||
|
@ -120,7 +119,7 @@ public class JmsMessageProducerInstrumentation implements TypeInstrumentation {
|
|||
}
|
||||
|
||||
Context parentContext = Java8BytecodeBridge.currentContext();
|
||||
request = MessageWithDestination.create(message, MessageOperation.SEND, destination);
|
||||
request = MessageWithDestination.create(message, destination);
|
||||
if (!producerInstrumenter().shouldStart(parentContext, request)) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -6,43 +6,59 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.jms;
|
||||
|
||||
import io.opentelemetry.api.GlobalOpenTelemetry;
|
||||
import io.opentelemetry.api.OpenTelemetry;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.Instrumenter;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanKindExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.SpanNameExtractor;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingSpanNameExtractor;
|
||||
|
||||
public final class JmsSingletons {
|
||||
private static final String INSTRUMENTATION_NAME = "io.opentelemetry.jms-1.1";
|
||||
|
||||
private static final Instrumenter<MessageWithDestination, Void> PRODUCER_INSTRUMENTER;
|
||||
private static final Instrumenter<MessageWithDestination, Void> CONSUMER_INSTRUMENTER;
|
||||
private static final Instrumenter<MessageWithDestination, Void> LISTENER_INSTRUMENTER;
|
||||
private static final Instrumenter<MessageWithDestination, Void> PRODUCER_INSTRUMENTER =
|
||||
buildProducerInstrumenter();
|
||||
private static final Instrumenter<MessageWithDestination, Void> CONSUMER_INSTRUMENTER =
|
||||
buildConsumerInstrumenter();
|
||||
private static final Instrumenter<MessageWithDestination, Void> LISTENER_INSTRUMENTER =
|
||||
buildListenerInstrumenter();
|
||||
|
||||
static {
|
||||
JmsMessageAttributesExtractor attributesExtractor = new JmsMessageAttributesExtractor();
|
||||
private static Instrumenter<MessageWithDestination, Void> buildProducerInstrumenter() {
|
||||
JmsMessageAttributesExtractor attributesExtractor =
|
||||
new JmsMessageAttributesExtractor(MessageOperation.SEND);
|
||||
SpanNameExtractor<MessageWithDestination> spanNameExtractor =
|
||||
MessagingSpanNameExtractor.create(attributesExtractor);
|
||||
|
||||
return Instrumenter.<MessageWithDestination, Void>newBuilder(
|
||||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.newProducerInstrumenter(new MessagePropertySetter());
|
||||
}
|
||||
|
||||
private static Instrumenter<MessageWithDestination, Void> buildConsumerInstrumenter() {
|
||||
JmsMessageAttributesExtractor attributesExtractor =
|
||||
new JmsMessageAttributesExtractor(MessageOperation.RECEIVE);
|
||||
SpanNameExtractor<MessageWithDestination> spanNameExtractor =
|
||||
MessagingSpanNameExtractor.create(attributesExtractor);
|
||||
|
||||
OpenTelemetry otel = GlobalOpenTelemetry.get();
|
||||
PRODUCER_INSTRUMENTER =
|
||||
Instrumenter.<MessageWithDestination, Void>newBuilder(
|
||||
otel, INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.newProducerInstrumenter(new MessagePropertySetter());
|
||||
// MessageConsumer does not do context propagation
|
||||
CONSUMER_INSTRUMENTER =
|
||||
Instrumenter.<MessageWithDestination, Void>newBuilder(
|
||||
otel, INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.setTimeExtractors(
|
||||
MessageWithDestination::startTime, (request, response, error) -> request.endTime())
|
||||
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
LISTENER_INSTRUMENTER =
|
||||
Instrumenter.<MessageWithDestination, Void>newBuilder(
|
||||
otel, INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.newConsumerInstrumenter(new MessagePropertyGetter());
|
||||
return Instrumenter.<MessageWithDestination, Void>newBuilder(
|
||||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.setTimeExtractors(
|
||||
MessageWithDestination::startTime, (request, response, error) -> request.endTime())
|
||||
.newInstrumenter(SpanKindExtractor.alwaysConsumer());
|
||||
}
|
||||
|
||||
private static Instrumenter<MessageWithDestination, Void> buildListenerInstrumenter() {
|
||||
JmsMessageAttributesExtractor attributesExtractor =
|
||||
new JmsMessageAttributesExtractor(MessageOperation.PROCESS);
|
||||
SpanNameExtractor<MessageWithDestination> spanNameExtractor =
|
||||
MessagingSpanNameExtractor.create(attributesExtractor);
|
||||
|
||||
return Instrumenter.<MessageWithDestination, Void>newBuilder(
|
||||
GlobalOpenTelemetry.get(), INSTRUMENTATION_NAME, spanNameExtractor)
|
||||
.addAttributesExtractor(attributesExtractor)
|
||||
.newConsumerInstrumenter(new MessagePropertyGetter());
|
||||
}
|
||||
|
||||
public static Instrumenter<MessageWithDestination, Void> producerInstrumenter() {
|
||||
|
|
|
@ -6,7 +6,6 @@
|
|||
package io.opentelemetry.javaagent.instrumentation.jms;
|
||||
|
||||
import com.google.auto.value.AutoValue;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import java.time.Instant;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.JMSException;
|
||||
|
@ -24,8 +23,6 @@ public abstract class MessageWithDestination {
|
|||
|
||||
public abstract Message message();
|
||||
|
||||
public abstract MessageOperation messageOperation();
|
||||
|
||||
public abstract String destinationName();
|
||||
|
||||
public abstract String destinationKind();
|
||||
|
@ -42,13 +39,12 @@ public abstract class MessageWithDestination {
|
|||
return timer().endTime();
|
||||
}
|
||||
|
||||
public static MessageWithDestination create(
|
||||
Message message, MessageOperation operation, Destination fallbackDestination) {
|
||||
return create(message, operation, fallbackDestination, Timer.start());
|
||||
public static MessageWithDestination create(Message message, Destination fallbackDestination) {
|
||||
return create(message, fallbackDestination, Timer.start());
|
||||
}
|
||||
|
||||
public static MessageWithDestination create(
|
||||
Message message, MessageOperation operation, Destination fallbackDestination, Timer timer) {
|
||||
Message message, Destination fallbackDestination, Timer timer) {
|
||||
Destination jmsDestination = null;
|
||||
try {
|
||||
jmsDestination = message.getJMSDestination();
|
||||
|
@ -60,17 +56,17 @@ public abstract class MessageWithDestination {
|
|||
}
|
||||
|
||||
if (jmsDestination instanceof Queue) {
|
||||
return createMessageWithQueue(message, operation, (Queue) jmsDestination, timer);
|
||||
return createMessageWithQueue(message, (Queue) jmsDestination, timer);
|
||||
}
|
||||
if (jmsDestination instanceof Topic) {
|
||||
return createMessageWithTopic(message, operation, (Topic) jmsDestination, timer);
|
||||
return createMessageWithTopic(message, (Topic) jmsDestination, timer);
|
||||
}
|
||||
return new AutoValue_MessageWithDestination(
|
||||
message, operation, "unknown", "unknown", /* isTemporaryDestination= */ false, timer);
|
||||
message, "unknown", "unknown", /* isTemporaryDestination= */ false, timer);
|
||||
}
|
||||
|
||||
private static MessageWithDestination createMessageWithQueue(
|
||||
Message message, MessageOperation operation, Queue destination, Timer timer) {
|
||||
Message message, Queue destination, Timer timer) {
|
||||
String queueName;
|
||||
try {
|
||||
queueName = destination.getQueueName();
|
||||
|
@ -81,12 +77,11 @@ public abstract class MessageWithDestination {
|
|||
boolean temporary =
|
||||
destination instanceof TemporaryQueue || queueName.startsWith(TIBCO_TMP_PREFIX);
|
||||
|
||||
return new AutoValue_MessageWithDestination(
|
||||
message, operation, queueName, "queue", temporary, timer);
|
||||
return new AutoValue_MessageWithDestination(message, queueName, "queue", temporary, timer);
|
||||
}
|
||||
|
||||
private static MessageWithDestination createMessageWithTopic(
|
||||
Message message, MessageOperation operation, Topic destination, Timer timer) {
|
||||
Message message, Topic destination, Timer timer) {
|
||||
String topicName;
|
||||
try {
|
||||
topicName = destination.getTopicName();
|
||||
|
@ -97,7 +92,6 @@ public abstract class MessageWithDestination {
|
|||
boolean temporary =
|
||||
destination instanceof TemporaryTopic || topicName.startsWith(TIBCO_TMP_PREFIX);
|
||||
|
||||
return new AutoValue_MessageWithDestination(
|
||||
message, operation, topicName, "topic", temporary, timer);
|
||||
return new AutoValue_MessageWithDestination(message, topicName, "topic", temporary, timer);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
|
||||
public final class KafkaProducerAttributesExtractor
|
||||
extends MessagingAttributesExtractor<ProducerRecord<?, ?>, Void> {
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return MessageOperation.SEND;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String system(ProducerRecord<?, ?> producerRecord) {
|
||||
return "kafka";
|
||||
|
@ -63,11 +69,6 @@ public final class KafkaProducerAttributesExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(ProducerRecord<?, ?> producerRecord) {
|
||||
return MessageOperation.SEND;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String messageId(ProducerRecord<?, ?> producerRecord, @Nullable Void unused) {
|
||||
return null;
|
||||
|
|
|
@ -16,6 +16,11 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
public final class KafkaReceiveAttributesExtractor
|
||||
extends MessagingAttributesExtractor<ReceivedRecords, Void> {
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return MessageOperation.RECEIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String system(ReceivedRecords receivedRecords) {
|
||||
return "kafka";
|
||||
|
@ -71,11 +76,6 @@ public final class KafkaReceiveAttributesExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(ReceivedRecords receivedRecords) {
|
||||
return MessageOperation.RECEIVE;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String messageId(ReceivedRecords receivedRecords, @Nullable Void unused) {
|
||||
return null;
|
||||
|
|
|
@ -20,6 +20,11 @@ public final class KafkaConsumerAttributesExtractor
|
|||
this.messageOperation = messageOperation;
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return messageOperation;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String system(ConsumerRecord<?, ?> consumerRecord) {
|
||||
return "kafka";
|
||||
|
@ -70,11 +75,6 @@ public final class KafkaConsumerAttributesExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(ConsumerRecord<?, ?> consumerRecord) {
|
||||
return messageOperation;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String messageId(ConsumerRecord<?, ?> consumerRecord, @Nullable Void unused) {
|
||||
return null;
|
||||
|
|
|
@ -152,13 +152,13 @@ public class AgentContextStorage implements ContextStorage, AutoCloseable {
|
|||
BaggageBridging::toApplication,
|
||||
BaggageBridging::toAgent),
|
||||
bridgeSpanKey("SERVER_KEY"),
|
||||
bridgeSpanKey("CONSUMER_KEY"),
|
||||
bridgeSpanKey("HTTP_KEY"),
|
||||
bridgeSpanKey("RPC_KEY"),
|
||||
bridgeSpanKey("DB_KEY"),
|
||||
bridgeSpanKey("MESSAGING_KEY"),
|
||||
bridgeSpanKey("HTTP_CLIENT_KEY"),
|
||||
bridgeSpanKey("RPC_CLIENT_KEY"),
|
||||
bridgeSpanKey("DB_CLIENT_KEY"),
|
||||
bridgeSpanKey("CLIENT_KEY"),
|
||||
bridgeSpanKey("PRODUCER_KEY"),
|
||||
bridgeSpanKey("CONSUMER_RECEIVE_KEY"),
|
||||
bridgeSpanKey("CONSUMER_PROCESS_KEY"),
|
||||
};
|
||||
|
||||
private static ContextKeyBridge<Span, io.opentelemetry.api.trace.Span> bridgeSpanKey(
|
||||
|
|
|
@ -188,13 +188,13 @@ class ContextBridgeTest extends AgentInstrumentationSpecification {
|
|||
assert Span.current() != null
|
||||
def spanKeys = [
|
||||
SpanKey.SERVER,
|
||||
SpanKey.CONSUMER,
|
||||
SpanKey.HTTP_CLIENT,
|
||||
SpanKey.RPC_CLIENT,
|
||||
SpanKey.DB_CLIENT,
|
||||
SpanKey.MESSAGING_PRODUCER,
|
||||
SpanKey.ALL_CLIENTS,
|
||||
SpanKey.ALL_PRODUCERS
|
||||
SpanKey.PRODUCER,
|
||||
SpanKey.CONSUMER_RECEIVE,
|
||||
SpanKey.CONSUMER_PROCESS,
|
||||
]
|
||||
spanKeys.each { spanKey ->
|
||||
assert spanKey.fromContextOrNull(Context.current()) != null
|
||||
|
|
|
@ -62,13 +62,13 @@ public class AgentSpanTestingTracer extends BaseTracer {
|
|||
public static SpanKey[] getSpanKeys() {
|
||||
return new SpanKey[] {
|
||||
SpanKey.SERVER,
|
||||
SpanKey.CONSUMER,
|
||||
getSpanKeyByName("HTTP_CLIENT"),
|
||||
getSpanKeyByName("RPC_CLIENT"),
|
||||
getSpanKeyByName("DB_CLIENT"),
|
||||
getSpanKeyByName("MESSAGING_PRODUCER"),
|
||||
SpanKey.ALL_CLIENTS,
|
||||
SpanKey.ALL_PRODUCERS
|
||||
getSpanKeyByName("PRODUCER"),
|
||||
getSpanKeyByName("CONSUMER_RECEIVE"),
|
||||
getSpanKeyByName("CONSUMER_PROCESS")
|
||||
};
|
||||
}
|
||||
|
||||
|
|
|
@ -42,6 +42,7 @@ public final class SpringIntegrationTracingBuilder {
|
|||
Instrumenter.<MessageWithChannel, Void>newBuilder(
|
||||
openTelemetry, INSTRUMENTATION_NAME, new MessageChannelSpanNameExtractor())
|
||||
.addAttributesExtractors(additionalAttributeExtractors)
|
||||
.addAttributesExtractor(new SpringMessagingAttributesExtractor())
|
||||
.newConsumerInstrumenter(MessageHeadersGetter.INSTANCE);
|
||||
return new SpringIntegrationTracing(openTelemetry.getPropagators(), instrumenter);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,76 @@
|
|||
/*
|
||||
* Copyright The OpenTelemetry Authors
|
||||
* SPDX-License-Identifier: Apache-2.0
|
||||
*/
|
||||
|
||||
package io.opentelemetry.instrumentation.spring.integration;
|
||||
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessageOperation;
|
||||
import io.opentelemetry.instrumentation.api.instrumenter.messaging.MessagingAttributesExtractor;
|
||||
import org.checkerframework.checker.nullness.qual.Nullable;
|
||||
|
||||
// this class is needed mostly for correct CONSUMER span suppression
|
||||
final class SpringMessagingAttributesExtractor
|
||||
extends MessagingAttributesExtractor<MessageWithChannel, Void> {
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return MessageOperation.PROCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String system(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String destinationKind(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String destination(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected boolean temporaryDestination(MessageWithChannel messageWithChannel) {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String protocol(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String protocolVersion(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String url(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String conversationId(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable Long messagePayloadSize(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable Long messagePayloadCompressedSize(MessageWithChannel messageWithChannel) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String messageId(
|
||||
MessageWithChannel messageWithChannel, @Nullable Void unused) {
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -16,6 +16,12 @@ import org.checkerframework.checker.nullness.qual.Nullable;
|
|||
|
||||
public final class KafkaBatchProcessAttributesExtractor
|
||||
extends MessagingAttributesExtractor<ConsumerRecords<?, ?>, Void> {
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return MessageOperation.PROCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String system(ConsumerRecords<?, ?> records) {
|
||||
return "kafka";
|
||||
|
@ -69,11 +75,6 @@ public final class KafkaBatchProcessAttributesExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(ConsumerRecords<?, ?> records) {
|
||||
return MessageOperation.PROCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String messageId(ConsumerRecords<?, ?> records, @Nullable Void unused) {
|
||||
return null;
|
||||
|
|
|
@ -12,6 +12,12 @@ import org.springframework.amqp.core.Message;
|
|||
|
||||
final class SpringRabbitMessageAttributesExtractor
|
||||
extends MessagingAttributesExtractor<Message, Void> {
|
||||
|
||||
@Override
|
||||
public MessageOperation operation() {
|
||||
return MessageOperation.PROCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String system(Message message) {
|
||||
return "rabbitmq";
|
||||
|
@ -62,11 +68,6 @@ final class SpringRabbitMessageAttributesExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected MessageOperation operation(Message message) {
|
||||
return MessageOperation.PROCESS;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected @Nullable String messageId(Message message, @Nullable Void unused) {
|
||||
return message.getMessageProperties().getMessageId();
|
||||
|
|
Loading…
Reference in New Issue