Replace messaging.kafka.destination.partition with messaging.destination.partition.id (#11086)

This commit is contained in:
Lauri Tulmin 2024-04-11 09:16:50 +03:00 committed by GitHub
parent 79d50d1f11
commit 9efbec6fd5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 79 additions and 123 deletions

View File

@ -52,8 +52,6 @@ class KafkaSpanDecorator extends MessagingSpanDecorator {
return topic != null ? topic : super.getDestination(exchange, endpoint);
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
public void pre(
AttributesBuilder attributes,
@ -67,7 +65,7 @@ class KafkaSpanDecorator extends MessagingSpanDecorator {
Integer partition = exchange.getIn().getHeader(PARTITION, Integer.class);
if (partition != null) {
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION, partition);
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID, partition.toString());
}
if (CAPTURE_EXPERIMENTAL_SPAN_ATTRIBUTES) {

View File

@ -37,6 +37,7 @@ import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.TestInstance;
@ -152,8 +153,6 @@ public abstract class KafkaClientBaseTest {
consumer.seekToBeginning(Collections.emptyList());
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(
String messageKey, String messageValue, boolean testHeaders) {
List<AttributeAssertion> assertions =
@ -166,8 +165,8 @@ public abstract class KafkaClientBaseTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
@ -214,8 +213,6 @@ public abstract class KafkaClientBaseTest {
return assertions;
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> processAttributes(
String messageKey, String messageValue, boolean testHeaders) {
List<AttributeAssertion> assertions =
@ -228,8 +225,8 @@ public abstract class KafkaClientBaseTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -12,11 +12,10 @@ import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.nio.charset.StandardCharsets;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
void assertTraces() {
testing.waitAndAssertTraces(
@ -50,8 +49,8 @@ class InterceptorsSuppressReceiveSpansTest extends AbstractInterceptorsTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -17,11 +17,10 @@ import io.opentelemetry.semconv.incubating.MessagingIncubatingAttributes;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
class InterceptorsTest extends AbstractInterceptorsTest {
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
void assertTraces() {
AtomicReference<SpanContext> producerSpanContext = new AtomicReference<>();
@ -87,8 +86,8 @@ class InterceptorsTest extends AbstractInterceptorsTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -18,6 +18,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
@ -52,8 +53,6 @@ class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
.hasParent(trace.getSpan(0))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
@ -65,8 +64,8 @@ class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
@ -79,8 +78,6 @@ class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
return assertions;
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
@ -92,8 +89,8 @@ class WrapperSuppressReceiveSpansTest extends AbstractWrapperTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -22,6 +22,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
class WrapperTest extends AbstractWrapperTest {
@ -74,8 +75,6 @@ class WrapperTest extends AbstractWrapperTest {
.hasParent(trace.getSpan(1))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
@ -87,8 +86,8 @@ class WrapperTest extends AbstractWrapperTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
@ -101,8 +100,6 @@ class WrapperTest extends AbstractWrapperTest {
return assertions;
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
private static List<AttributeAssertion> processAttributes(String greeting, boolean testHeaders) {
List<AttributeAssertion> assertions =
new ArrayList<>(
@ -114,8 +111,8 @@ class WrapperTest extends AbstractWrapperTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
greeting.getBytes(StandardCharsets.UTF_8).length),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -16,8 +16,6 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
final class KafkaConsumerAttributesExtractor
implements AttributesExtractor<KafkaProcessRequest, Void> {
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
public void onStart(
AttributesBuilder attributes, Context parentContext, KafkaProcessRequest request) {
@ -25,8 +23,8 @@ final class KafkaConsumerAttributesExtractor
ConsumerRecord<?, ?> record = request.getRecord();
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
(long) record.partition());
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
String.valueOf(record.partition()));
attributes.put(MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, record.offset());
Object key = record.key();

View File

@ -35,8 +35,6 @@ final class KafkaProducerAttributesExtractor
return !(keyClass.isArray() || keyClass == ByteBuffer.class);
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Override
public void onEnd(
AttributesBuilder attributes,
@ -47,8 +45,8 @@ final class KafkaProducerAttributesExtractor
if (recordMetadata != null) {
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
recordMetadata.partition());
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
String.valueOf(recordMetadata.partition()));
attributes.put(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET, recordMetadata.offset());
}

View File

@ -24,7 +24,6 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
@SuppressWarnings("deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
@ -102,7 +101,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("producer") }
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
}
@ -139,7 +138,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
"kafka.record.queue_time_ms" { it >= 0 }
@ -159,7 +158,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("producer") }
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
}
}
@ -195,7 +194,7 @@ class KafkaStreamsDefaultTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
if (Boolean.getBoolean("testLatestDeps")) {

View File

@ -24,7 +24,6 @@ import static io.opentelemetry.api.trace.SpanKind.PRODUCER
class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
@SuppressWarnings("deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
def "test kafka produce and consume with streams in-between"() {
setup:
def config = new Properties()
@ -97,7 +96,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PENDING
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" "producer-1"
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
}
@ -113,7 +112,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.endsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
"kafka.record.queue_time_ms" { it >= 0 }
@ -136,7 +135,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_NAME" STREAM_PROCESSED
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "publish"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
}
}
@ -151,7 +150,7 @@ class KafkaStreamsSuppressReceiveSpansTest extends KafkaStreamsBaseTest {
"$MessagingIncubatingAttributes.MESSAGING_OPERATION" "process"
"$MessagingIncubatingAttributes.MESSAGING_CLIENT_ID" { it.startsWith("consumer") }
"$MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE" Long
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION" { it >= 0 }
"$MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID" String
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET" 0
"$MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_KEY" "10"
if (Boolean.getBoolean("testLatestDeps")) {

View File

@ -35,6 +35,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -174,8 +175,6 @@ public abstract class AbstractReactorKafkaTest {
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(ProducerRecord<String, String> record) {
List<AttributeAssertion> assertions =
new ArrayList<>(
@ -187,8 +186,8 @@ public abstract class AbstractReactorKafkaTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
@ -217,8 +216,6 @@ public abstract class AbstractReactorKafkaTest {
return assertions;
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> processAttributes(
ProducerRecord<String, String> record) {
List<AttributeAssertion> assertions =
@ -231,8 +228,8 @@ public abstract class AbstractReactorKafkaTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));

View File

@ -16,6 +16,7 @@ import java.time.Duration;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@ -84,7 +85,6 @@ class KafkaIntegrationTest {
// In kafka 2 ops.send is deprecated. We are using it to avoid reflection because kafka 3 also has
// ops.send, although with different return type.
@SuppressWarnings({"unchecked", "deprecation"})
// TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
private static void runShouldInstrumentProducerAndConsumer(
ConfigurableApplicationContext applicationContext) {
KafkaTemplate<String, String> kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);
@ -117,8 +117,8 @@ class KafkaIntegrationTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -138,8 +138,8 @@ class KafkaIntegrationTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
AbstractLongAssert::isNotNegative),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -30,6 +30,7 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -48,8 +49,6 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
return emptyList();
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Test
void shouldCreateSpansForSingleRecordProcess() {
testing.runWithSpan(
@ -80,8 +79,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
"testSingleTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -127,8 +126,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
AbstractLongAssert::isNotNegative),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -146,9 +145,6 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
}
@SuppressWarnings(
"deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION
// deprecation
@Test
void shouldHandleFailureInKafkaListener() {
testing.runWithSpan(
@ -188,8 +184,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
AbstractLongAssert::isNotNegative),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -218,8 +214,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
"testSingleTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -266,8 +262,6 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Test
void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
Map<String, String> batchMessages = new HashMap<>();
@ -294,8 +288,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
"testBatchTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -314,8 +308,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
"testBatchTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -371,9 +365,6 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
span -> span.hasName("consumer").hasParent(trace.getSpan(1))));
}
@SuppressWarnings(
"deprecation") // TODO MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION
// deprecation
@Test
void shouldHandleFailureInKafkaBatchListener() {
testing.runWithSpan(
@ -404,8 +395,8 @@ class SpringKafkaTest extends AbstractSpringKafkaTest {
"testBatchTopic"),
equalTo(MessagingIncubatingAttributes.MESSAGING_OPERATION, "publish"),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -21,14 +21,13 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.Test;
public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends AbstractSpringKafkaTest {
protected abstract boolean isLibraryInstrumentationTest();
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Test
void shouldCreateSpansForSingleRecordProcess() {
testing()
@ -63,8 +62,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -87,8 +86,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
AbstractLongAssert::isNotNegative),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -104,8 +103,6 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
span -> span.hasName("consumer").hasParent(trace.getSpan(2))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Test
void shouldHandleFailureInKafkaListener() {
testing()
@ -128,8 +125,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
MessagingIncubatingAttributes.MESSAGING_MESSAGE_BODY_SIZE,
AbstractLongAssert::isNotNegative),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -161,8 +158,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -194,8 +191,6 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
span -> span.hasName("consumer").hasParent(trace.getSpan(6))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Test
void shouldCreateSpansForBatchReceiveAndProcess() throws InterruptedException {
Map<String, String> batchMessages = new HashMap<>();
@ -226,9 +221,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -248,9 +242,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),
@ -290,8 +283,6 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
span -> span.hasName("consumer").hasParent(trace.getSpan(0))));
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
@Test
void shouldHandleFailureInKafkaBatchListener() {
testing()
@ -339,9 +330,8 @@ public abstract class AbstractSpringKafkaNoReceiveTelemetryTest extends Abstract
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes
.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative),

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.assertj.core.api.AbstractLongAssert;
import org.assertj.core.api.AbstractStringAssert;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.RegisterExtension;
@ -194,8 +195,6 @@ public abstract class AbstractVertxKafkaTest {
}
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> sendAttributes(
KafkaProducerRecord<String, String> record) {
List<AttributeAssertion> assertions =
@ -208,8 +207,8 @@ public abstract class AbstractVertxKafkaTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("producer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));
@ -249,8 +248,6 @@ public abstract class AbstractVertxKafkaTest {
return assertions;
}
@SuppressWarnings("deprecation") // TODO
// MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION deprecation
protected static List<AttributeAssertion> processAttributes(
KafkaProducerRecord<String, String> record) {
List<AttributeAssertion> assertions =
@ -263,8 +260,8 @@ public abstract class AbstractVertxKafkaTest {
MessagingIncubatingAttributes.MESSAGING_CLIENT_ID,
stringAssert -> stringAssert.startsWith("consumer")),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_DESTINATION_PARTITION,
AbstractLongAssert::isNotNegative),
MessagingIncubatingAttributes.MESSAGING_DESTINATION_PARTITION_ID,
AbstractStringAssert::isNotEmpty),
satisfies(
MessagingIncubatingAttributes.MESSAGING_KAFKA_MESSAGE_OFFSET,
AbstractLongAssert::isNotNegative)));