// Copyright The OpenTelemetry Authors // SPDX-License-Identifier: Apache-2.0 using IntegrationTests.Helpers; using OpenTelemetry.Proto.Common.V1; using OpenTelemetry.Proto.Trace.V1; using Xunit.Abstractions; namespace IntegrationTests; [Collection(KafkaCollection.Name)] public class KafkaTests : TestHelper { private const string MessagingPublishOperationAttributeValue = "publish"; private const string MessagingReceiveOperationAttributeValue = "receive"; private const string MessagingSystemAttributeName = "messaging.system"; private const string MessagingOperationAttributeName = "messaging.operation"; private const string MessagingDestinationAttributeName = "messaging.destination.name"; private const string MessagingClientIdAttributeName = "messaging.client_id"; private const string KafkaMessageSystemAttributeValue = "kafka"; private const string KafkaConsumerGroupAttributeName = "messaging.kafka.consumer.group"; private const string KafkaMessageKeyAttributeName = "messaging.kafka.message.key"; private const string KafkaMessageKeyAttributeValue = "testkey"; private const string KafkaDestinationPartitionAttributeName = "messaging.kafka.destination.partition"; private const string KafkaMessageOffsetAttributeName = "messaging.kafka.message.offset"; private const string KafkaMessageTombstoneAttributeName = "messaging.kafka.message.tombstone"; private const string KafkaInstrumentationScopeName = "OpenTelemetry.AutoInstrumentation.Kafka"; private const string KafkaProducerClientIdAttributeValue = "rdkafka#producer-1"; private const string KafkaConsumerClientIdAttributeValue = "rdkafka#consumer-2"; // https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Offset.cs#L36C44-L36C44 private const int InvalidOffset = -1001; private readonly KafkaFixture _kafka; public KafkaTests(ITestOutputHelper testOutputHelper, KafkaFixture kafka) : base("Kafka", testOutputHelper) { _kafka = kafka; } [Theory] [Trait("Category", "EndToEnd")] [Trait("Containers", "Linux")] [MemberData(nameof(LibraryVersion.Kafka), MemberType = typeof(LibraryVersion))] public void SubmitsTraces(string packageVersion) { var topicName = $"test-topic-{packageVersion}"; using var collector = new MockSpansCollector(Output); SetExporter(collector); // Failed produce attempts made before topic is created. collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateResultProcessingProduceExceptionSpan(span, topicName), "Failed Produce attempt with delivery handler set."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProduceExceptionSpan(span, topicName), "Failed Produce attempt without delivery handler set."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateResultProcessingProduceExceptionSpan(span, topicName), "Failed ProduceAsync attempt."); if (packageVersion == string.Empty || Version.Parse(packageVersion) != new Version(1, 4, 0)) { // Failed consume attempt. collector.Expect( KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumeExceptionSpan(span, topicName), "Failed Consume attempt."); } // Successful produce attempts after topic was created with admin client. collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, 0), "Successful ProduceAsync attempt."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1), "Successful Produce attempt without delivery handler set."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, 0), "Successful Produce attempt with delivery handler set."); // Successful produce attempt after topic was created for tombstones. collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1, true), "Successful sync Publish attempt with a tombstone."); // Successful consume attempts. collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 0), "First successful Consume attempt."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 1), "Second successful Consume attempt."); collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt."); collector.ExpectCollected(collection => ValidatePropagation(collection, topicName)); EnableBytecodeInstrumentation(); RunTestApplication(new TestSettings { PackageVersion = packageVersion, Arguments = $"--kafka {_kafka.Port} --topic-name {topicName}" }); collector.AssertExpectations(); } [Theory] [Trait("Category", "EndToEnd")] [Trait("Containers", "Linux")] [MemberData(nameof(LibraryVersion.GetPlatformVersions), nameof(LibraryVersion.Kafka), MemberType = typeof(LibraryVersion))] // ReSharper disable once InconsistentNaming public void NoSpansForPartitionEOF(string packageVersion) { var topicName = $"test-topic2-{packageVersion}"; using var collector = new MockSpansCollector(Output); SetExporter(collector); EnableBytecodeInstrumentation(); RunTestApplication(new TestSettings { PackageVersion = packageVersion, Arguments = $"--kafka {_kafka.Port} --topic-name {topicName} --consume-only" }); collector.AssertEmpty(TimeSpan.FromSeconds(10)); } private static string GetConsumerGroupIdAttributeValue(string topicName) { return $"test-consumer-group-{topicName}"; } private static bool ValidateConsumeExceptionSpan(Span span, string topicName) { return ValidateConsumerSpan(span, topicName, InvalidOffset, null) && span.Status.Code == Status.Types.StatusCode.Error; } private static bool ValidateConsumerSpan(Span span, string topicName, int messageOffset, string? expectedMessageKey = KafkaMessageKeyAttributeValue) { var kafkaMessageOffset = span.Attributes.SingleOrDefault(kv => kv.Key == KafkaMessageOffsetAttributeName)?.Value.IntValue; var consumerGroupId = span.Attributes.Single(kv => kv.Key == KafkaConsumerGroupAttributeName).Value.StringValue; return ValidateCommonAttributes(span.Attributes, topicName, KafkaConsumerClientIdAttributeValue, MessagingReceiveOperationAttributeValue, 0, expectedMessageKey) && kafkaMessageOffset == messageOffset && consumerGroupId == GetConsumerGroupIdAttributeValue(topicName); } private static bool ValidateBasicProduceExceptionSpan(Span span, string topicName) { return ValidateCommonAttributes(span.Attributes, topicName, KafkaProducerClientIdAttributeValue, MessagingPublishOperationAttributeValue, -1, KafkaMessageKeyAttributeValue) && span.Status.Code == Status.Types.StatusCode.Error; } private static bool ValidateProduceExceptionSpan(Span span, string topicName) { return ValidateBasicProduceExceptionSpan(span, topicName) && span.Attributes.Count(kv => kv.Key == KafkaMessageOffsetAttributeName) == 0; } private static bool ValidateResultProcessingProduceExceptionSpan(Span span, string topicName) { // DeliveryResult processing results in offset being set. var offset = span.Attributes.SingleOrDefault(kv => kv.Key == KafkaMessageOffsetAttributeName)?.Value.IntValue; return ValidateBasicProduceExceptionSpan(span, topicName) && offset == InvalidOffset; } private static bool ValidateProducerSpan(Span span, string topicName, int partition, bool tombstoneExpected = false) { var isTombstone = span.Attributes.Single(kv => kv.Key == KafkaMessageTombstoneAttributeName).Value.BoolValue; return ValidateCommonAttributes(span.Attributes, topicName, KafkaProducerClientIdAttributeValue, MessagingPublishOperationAttributeValue, partition, KafkaMessageKeyAttributeValue) && isTombstone == tombstoneExpected && span.Status is null; } private static bool ValidateCommonAttributes(IReadOnlyCollection attributes, string topicName, string clientId, string operationName, int partition, string? expectedMessageKey) { var messagingDestinationName = attributes.SingleOrDefault(kv => kv.Key == MessagingDestinationAttributeName)?.Value.StringValue; var kafkaMessageKey = attributes.SingleOrDefault(kv => kv.Key == KafkaMessageKeyAttributeName)?.Value.StringValue; var kafkaPartition = attributes.SingleOrDefault(kv => kv.Key == KafkaDestinationPartitionAttributeName)?.Value.IntValue; return ValidateBasicSpanAttributes(attributes, clientId, operationName) && messagingDestinationName == topicName && kafkaMessageKey == expectedMessageKey && kafkaPartition == partition; } private static bool ValidateBasicSpanAttributes(IReadOnlyCollection attributes, string clientId, string operationName) { var messagingSystem = attributes.Single(kv => kv.Key == MessagingSystemAttributeName).Value.StringValue; var messagingOperation = attributes.Single(kv => kv.Key == MessagingOperationAttributeName).Value.StringValue; var messagingClientId = attributes.Single(kv => kv.Key == MessagingClientIdAttributeName).Value.StringValue; return messagingSystem == KafkaMessageSystemAttributeValue && messagingOperation == operationName && messagingClientId == clientId; } private static bool ValidatePropagation(ICollection collectedSpans, string topicName) { var expectedReceiveOperationName = $"{topicName} {MessagingReceiveOperationAttributeValue}"; var expectedPublishOperationName = $"{topicName} {MessagingPublishOperationAttributeValue}"; var producerSpans = collectedSpans .Where(span => span.Span.Name == expectedPublishOperationName && !span.Span.Attributes.Single(attr => attr.Key == KafkaMessageTombstoneAttributeName).Value.BoolValue && span.Span.Status is null); return producerSpans.Select( producerSpan => GetMatchingConsumerSpan(collectedSpans, producerSpan.Span, expectedReceiveOperationName)) .All(consumerSpan => consumerSpan is not null); } private static MockSpansCollector.Collected? GetMatchingConsumerSpan(ICollection collectedSpans, Span producerSpan, string expectedReceiveOperationName) { return collectedSpans .SingleOrDefault(span => { var parentLinksCount = span.Span.Links.Count( link => link.TraceId == producerSpan.TraceId && link.SpanId == producerSpan.SpanId); return span.Span.Name == expectedReceiveOperationName && parentLinksCount == 1; }); } }