[kafka] `ProduceAsync` instrumentation (#3165)

This commit is contained in:
Mateusz Łach 2023-11-30 09:23:34 +01:00 committed by GitHub
parent f5af49ee53
commit 5c30867158
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 620 additions and 237 deletions

View File

@ -6,6 +6,7 @@ OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerCo
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceAsyncIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object! state) -> void
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity! activity, object! state, System.DateTimeOffset? startTime) -> void

View File

@ -6,6 +6,7 @@ OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerCo
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceAsyncIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object! state) -> void
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity! activity, object! state, System.DateTimeOffset? startTime) -> void

View File

@ -19,7 +19,7 @@ internal static partial class InstrumentationDefinitions
private static NativeCallTargetDefinition[] GetDefinitionsArray()
{
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(15);
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(16);
// Traces
var tracerSettings = Instrumentation.TracerSettings.Value;
if (tracerSettings.TracesEnabled)
@ -38,6 +38,7 @@ internal static partial class InstrumentationDefinitions
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Consume", new[] {"Confluent.Kafka.ConsumeResult`2[!0,!1]", "System.Int32"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Dispose", new[] {"System.Void"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2+TypedDeliveryHandlerShim_Action", ".ctor", new[] {"System.Void", "System.String", "!0", "!1", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2", "ProduceAsync", new[] {"System.Threading.Tasks.Task`1", "Confluent.Kafka.TopicPartition", "Confluent.Kafka.Message`2[!0,!1]", "System.Threading.CancellationToken"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceAsyncIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2", "Produce", new[] {"System.Void", "Confluent.Kafka.TopicPartition", "Confluent.Kafka.Message`2[!0,!1]", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration"));
}

View File

@ -19,7 +19,7 @@ internal static partial class InstrumentationDefinitions
private static NativeCallTargetDefinition[] GetDefinitionsArray()
{
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(23);
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(24);
// Traces
var tracerSettings = Instrumentation.TracerSettings.Value;
if (tracerSettings.TracesEnabled)
@ -32,6 +32,7 @@ internal static partial class InstrumentationDefinitions
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Consume", new[] {"Confluent.Kafka.ConsumeResult`2[!0,!1]", "System.Int32"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Dispose", new[] {"System.Void"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2+TypedDeliveryHandlerShim_Action", ".ctor", new[] {"System.Void", "System.String", "!0", "!1", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2", "ProduceAsync", new[] {"System.Threading.Tasks.Task`1", "Confluent.Kafka.TopicPartition", "Confluent.Kafka.Message`2[!0,!1]", "System.Threading.CancellationToken"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceAsyncIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2", "Produce", new[] {"System.Void", "Confluent.Kafka.TopicPartition", "Confluent.Kafka.Message`2[!0,!1]", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration"));
}

View File

@ -1,4 +1,4 @@
// <copyright file="IResultException.cs" company="OpenTelemetry Authors">
// <copyright file="IConsumeException.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -17,7 +17,7 @@
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/ConsumeException.cs
internal interface IResultException
internal interface IConsumeException
{
public IConsumeResult? ConsumerRecord { get; set; }
}

View File

@ -17,11 +17,7 @@
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/DeliveryReport.cs
internal interface IDeliveryReport
internal interface IDeliveryReport : IDeliveryResult
{
IError? Error { get; set; }
public Partition Partition { get; set; }
public Offset Offset { get; set; }
}

View File

@ -0,0 +1,25 @@
// <copyright file="IDeliveryResult.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/DeliveryResult.cs
internal interface IDeliveryResult
{
public Partition Partition { get; set; }
public Offset Offset { get; set; }
}

View File

@ -0,0 +1,23 @@
// <copyright file="IProduceException.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/ProduceException.cs
internal interface IProduceException
{
public IDeliveryResult DeliveryResult { get; }
}

View File

@ -27,6 +27,7 @@ internal static class IntegrationConstants
public const string ConsumerTypeName = "Confluent.Kafka.Consumer`2";
public const string ConsumerBuilderTypeName = "Confluent.Kafka.ConsumerBuilder`2[!0,!1]";
public const string ProduceSyncMethodName = "Produce";
public const string ProduceAsyncMethodName = "ProduceAsync";
public const string ConsumeSyncMethodName = "Consume";
public const string DisposeMethodName = "Dispose";
public const string CloseMethodName = "Close";

View File

@ -36,6 +36,8 @@ namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
type: InstrumentationType.Trace)]
public static class ConsumerConstructorIntegration
{
private const string ConsumerGroupIdConfigKey = "group.id";
internal static CallTargetState OnMethodBegin<TTarget, TConsumerBuilder>(TTarget instance, TConsumerBuilder consumerBuilder)
where TConsumerBuilder : IConsumerBuilder, IDuckType
{
@ -54,7 +56,7 @@ public static class ConsumerConstructorIntegration
{
if (string.Equals(
keyValuePair.Key,
KafkaCommon.ConsumerGroupIdConfigKey,
ConsumerGroupIdConfigKey,
StringComparison.OrdinalIgnoreCase))
{
consumerGroupId = keyValuePair.Value;

View File

@ -14,11 +14,10 @@
// limitations under the License.
// </copyright>
using System.Diagnostics;
using OpenTelemetry.AutoInstrumentation.CallTarget;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
using OpenTelemetry.Context.Propagation;
using OpenTelemetry.AutoInstrumentation.Util;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
@ -51,9 +50,9 @@ public static class ConsumerConsumeSyncIntegration
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
{
IConsumeResult? consumeResult;
if (exception is not null && exception.TryDuckCast<IResultException>(out var resultException))
if (exception is not null && exception.TryDuckCast<IConsumeException>(out var consumeException))
{
consumeResult = resultException.ConsumerRecord;
consumeResult = consumeException.ConsumerRecord;
}
else
{
@ -65,37 +64,10 @@ public static class ConsumerConsumeSyncIntegration
return new CallTargetReturn<TResponse>(response);
}
var propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, KafkaCommon.MessageHeaderValueGetter);
string? spanName = null;
if (!string.IsNullOrEmpty(consumeResult.Topic))
var activity = KafkaInstrumentation.StartConsumerActivity(consumeResult, (DateTimeOffset)state.StartTime!, instance!);
if (exception is not null)
{
spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ReceiveOperationName}";
}
spanName ??= MessagingAttributes.Values.ReceiveOperationName;
var activityLinks = propagatedContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagatedContext.ActivityContext) }
: Array.Empty<ActivityLink>();
var startTime = (DateTimeOffset)state.StartTime!;
var activity = KafkaCommon.Source.StartActivity(name: spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: startTime);
if (activity is { IsAllDataRequested: true })
{
KafkaCommon.SetCommonAttributes(
activity,
MessagingAttributes.Values.ReceiveOperationName,
consumeResult.Topic,
consumeResult.Partition,
consumeResult.Message?.Key,
instance.DuckCast<INamedClient>());
activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value);
if (ConsumerCache.TryGet(instance!, out var groupId))
{
activity.SetTag(MessagingAttributes.Keys.Kafka.ConsumerGroupId, groupId);
}
activity.SetException(exception);
}
activity?.Stop();

View File

@ -0,0 +1,36 @@
// <copyright file="MessageHeadersHelper.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
internal static class MessageHeadersHelper<TTypeMarker>
{
// ReSharper disable once StaticMemberInGenericType
private static readonly Type HeadersType;
static MessageHeadersHelper()
{
HeadersType = typeof(TTypeMarker).Assembly.GetType("Confluent.Kafka.Headers")!;
}
public static IHeaders? Create()
{
return Activator.CreateInstance(HeadersType).DuckCast<IHeaders>();
}
}

View File

@ -102,12 +102,7 @@ public static class ProducerDeliveryHandlerActionIntegration
activity.SetException(new Exception(deliveryReport.Error.ToString()));
}
// Set the final partition message was delivered to.
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, deliveryReport.Partition.Value);
activity.SetTag(
MessagingAttributes.Keys.Kafka.PartitionOffset,
deliveryReport.Offset.Value);
KafkaInstrumentation.SetDeliveryResults(activity, deliveryReport);
}
try

View File

@ -0,0 +1,96 @@
// <copyright file="ProducerProduceAsyncIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.CallTarget;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
using OpenTelemetry.AutoInstrumentation.Util;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Producer ProduceAsync integration
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ProducerTypeName,
methodName: IntegrationConstants.ProduceAsyncMethodName,
returnTypeName: ClrNames.GenericTask,
parameterTypeNames: new[] { IntegrationConstants.TopicPartitionTypeName, IntegrationConstants.MessageTypeName, ClrNames.CancellationToken },
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ProducerProduceAsyncIntegration
{
internal static CallTargetState OnMethodBegin<TTarget, TTopicPartition, TMessage>(
TTarget instance, TTopicPartition topicPartition, TMessage message, CancellationToken cancellationToken)
where TMessage : IKafkaMessage, IDuckType
{
// Duck type created for message is a struct.
if (message.Instance is null || topicPartition is null)
{
// Exit early if provided parameters are invalid.
return CallTargetState.GetDefault();
}
var activity = KafkaInstrumentation.StartProducerActivity(topicPartition.DuckCast<ITopicPartition>(), message, instance.DuckCast<INamedClient>()!);
if (activity is not null)
{
KafkaInstrumentation.InjectContext<TTopicPartition>(message, activity);
return new CallTargetState(activity);
}
return CallTargetState.GetDefault();
}
internal static TReturn OnAsyncMethodEnd<TTarget, TReturn>(
TTarget instance, TReturn returnValue, Exception? exception, in CallTargetState state)
where TReturn : IDeliveryResult
{
var activity = state.Activity;
if (activity is null)
{
return returnValue;
}
IDeliveryResult? deliveryResult;
if (exception is not null && exception.TryDuckCast<IProduceException>(out var produceException))
{
deliveryResult = produceException.DeliveryResult;
}
else
{
deliveryResult = returnValue;
}
if (deliveryResult is not null)
{
KafkaInstrumentation.SetDeliveryResults(activity, deliveryResult);
}
if (exception is not null)
{
activity.SetException(exception);
}
activity.Stop();
return returnValue;
}
}

View File

@ -43,43 +43,18 @@ public static class ProducerProduceSyncIntegration
TTarget instance, TTopicPartition topicPartition, TMessage message, TDeliveryHandler deliveryHandler)
where TMessage : IKafkaMessage, IDuckType
{
// duck type created for message is a struct
if (message.Instance is null || topicPartition is null || !topicPartition.TryDuckCast<ITopicPartition>(out var duckTypedTopicPartition))
// Duck type created for message is a struct.
if (message.Instance is null || topicPartition is null)
{
// invalid parameters, exit early
// Exit early if provided parameters are invalid.
return CallTargetState.GetDefault();
}
string? spanName = null;
if (!string.IsNullOrEmpty(duckTypedTopicPartition.Topic))
{
spanName = $"{duckTypedTopicPartition.Topic} {MessagingAttributes.Values.PublishOperationName}";
}
spanName ??= MessagingAttributes.Values.PublishOperationName;
var activity = KafkaCommon.Source.StartActivity(name: spanName, ActivityKind.Producer);
var activity = KafkaInstrumentation.StartProducerActivity(topicPartition.DuckCast<ITopicPartition>(), message, instance.DuckCast<INamedClient>()!);
if (activity is not null)
{
message.Headers ??= MessageHeadersHelper<TTopicPartition>.Create();
Propagators.DefaultTextMapPropagator.Inject<IKafkaMessage>(
new PropagationContext(activity.Context, Baggage.Current),
message,
KafkaCommon.MessageHeaderValueSetter);
if (activity.IsAllDataRequested)
{
KafkaCommon.SetCommonAttributes(
activity,
MessagingAttributes.Values.PublishOperationName,
duckTypedTopicPartition.Topic,
duckTypedTopicPartition.Partition,
message.Key,
instance.DuckCast<INamedClient>());
activity.SetTag(MessagingAttributes.Keys.Kafka.IsTombstone, message.Value is null);
}
// Store as state information if delivery handler was set
KafkaInstrumentation.InjectContext<TTopicPartition>(message, activity);
// Store delivery handler as state.
return new CallTargetState(activity, deliveryHandler);
}
@ -99,8 +74,8 @@ public static class ProducerProduceSyncIntegration
activity.SetException(exception);
}
// If delivery handler was not set, stop the activity
if (state.State is null)
// If delivery handler was not set, stop the activity.
if (state.State is null || exception is not null)
{
activity.Stop();
}
@ -109,27 +84,11 @@ public static class ProducerProduceSyncIntegration
// If delivery handler was set,
// only set parent as a current activity.
// Activity will be stopped inside updated
// delivery handler
// delivery handler.
var current = Activity.Current;
Activity.Current = current?.Parent;
}
return CallTargetReturn.GetDefault();
}
private static class MessageHeadersHelper<TTypeMarker>
{
// ReSharper disable once StaticMemberInGenericType
private static readonly Type HeadersType;
static MessageHeadersHelper()
{
HeadersType = typeof(TTypeMarker).Assembly.GetType("Confluent.Kafka.Headers")!;
}
public static IHeaders? Create()
{
return Activator.CreateInstance(HeadersType).DuckCast<IHeaders>();
}
}
}

View File

@ -1,72 +0,0 @@
// <copyright file="KafkaCommon.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using System.Text;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
internal static class KafkaCommon
{
public const string ConsumerGroupIdConfigKey = "group.id";
public static ActivitySource Source { get; } = new("OpenTelemetry.AutoInstrumentation.Kafka");
public static void MessageHeaderValueSetter(IKafkaMessage msg, string key, string val)
{
msg.Headers?.Remove(key);
msg.Headers?.Add(key, Encoding.UTF8.GetBytes(val));
}
public static IEnumerable<string> MessageHeaderValueGetter(IConsumeResult? message, string key)
{
if (message?.Message?.Headers is not null && message.Message.Headers.TryGetLastBytes(key, out var bytes))
{
return new[] { Encoding.UTF8.GetString(bytes) };
}
return Enumerable.Empty<string>();
}
public static void SetCommonAttributes(
Activity activity,
string operationName,
string? topic,
Partition partition,
object? key,
INamedClient? client)
{
activity.SetTag(MessagingAttributes.Keys.MessagingOperation, operationName);
activity.SetTag(MessagingAttributes.Keys.MessagingSystem, MessagingAttributes.Values.KafkaMessagingSystemName);
if (!string.IsNullOrEmpty(topic))
{
activity.SetTag(MessagingAttributes.Keys.DestinationName, topic);
}
if (client is not null)
{
activity.SetTag(MessagingAttributes.Keys.ClientId, client.Name);
}
if (key is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.MessageKey, key);
}
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, partition.Value);
}
}

View File

@ -0,0 +1,158 @@
// <copyright file="KafkaInstrumentation.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using System.Text;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
using OpenTelemetry.Context.Propagation;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
internal static class KafkaInstrumentation
{
private static ActivitySource Source { get; } = new("OpenTelemetry.AutoInstrumentation.Kafka");
public static Activity? StartConsumerActivity(IConsumeResult consumeResult, DateTimeOffset startTime, object consumer)
{
var propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, MessageHeaderValueGetter);
string? spanName = null;
if (!string.IsNullOrEmpty(consumeResult.Topic))
{
spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ReceiveOperationName}";
}
spanName ??= MessagingAttributes.Values.ReceiveOperationName;
var activityLinks = propagatedContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagatedContext.ActivityContext) }
: Array.Empty<ActivityLink>();
var activity = Source.StartActivity(name: spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: startTime);
if (activity is { IsAllDataRequested: true })
{
SetCommonAttributes(
activity,
MessagingAttributes.Values.ReceiveOperationName,
consumeResult.Topic,
consumeResult.Partition,
consumeResult.Message?.Key,
consumer.DuckCast<INamedClient>());
activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value);
if (ConsumerCache.TryGet(consumer, out var groupId))
{
activity.SetTag(MessagingAttributes.Keys.Kafka.ConsumerGroupId, groupId);
}
}
return activity;
}
public static Activity? StartProducerActivity(
ITopicPartition partition,
IKafkaMessage message,
INamedClient producer)
{
string? spanName = null;
if (!string.IsNullOrEmpty(partition.Topic))
{
spanName = $"{partition.Topic} {MessagingAttributes.Values.PublishOperationName}";
}
spanName ??= MessagingAttributes.Values.PublishOperationName;
var activity = Source.StartActivity(name: spanName, ActivityKind.Producer);
if (activity is not null && activity.IsAllDataRequested)
{
SetCommonAttributes(
activity,
MessagingAttributes.Values.PublishOperationName,
partition.Topic,
partition.Partition,
message.Key,
producer);
activity.SetTag(MessagingAttributes.Keys.Kafka.IsTombstone, message.Value is null);
}
return activity;
}
public static void InjectContext<TTopicPartition>(IKafkaMessage message, Activity activity)
{
message.Headers ??= MessageHeadersHelper<TTopicPartition>.Create();
Propagators.DefaultTextMapPropagator.Inject(
new PropagationContext(activity.Context, Baggage.Current),
message,
MessageHeaderValueSetter);
}
public static void SetDeliveryResults(Activity activity, IDeliveryResult deliveryResult)
{
// Set the final partition message was delivered to.
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, deliveryResult.Partition.Value);
activity.SetTag(
MessagingAttributes.Keys.Kafka.PartitionOffset,
deliveryResult.Offset.Value);
}
private static void SetCommonAttributes(
Activity activity,
string operationName,
string? topic,
Partition partition,
object? key,
INamedClient? client)
{
activity.SetTag(MessagingAttributes.Keys.MessagingOperation, operationName);
activity.SetTag(MessagingAttributes.Keys.MessagingSystem, MessagingAttributes.Values.KafkaMessagingSystemName);
if (!string.IsNullOrEmpty(topic))
{
activity.SetTag(MessagingAttributes.Keys.DestinationName, topic);
}
if (client is not null)
{
activity.SetTag(MessagingAttributes.Keys.ClientId, client.Name);
}
if (key is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.MessageKey, key);
}
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, partition.Value);
}
private static IEnumerable<string> MessageHeaderValueGetter(IConsumeResult? message, string key)
{
if (message?.Message?.Headers is not null && message.Message.Headers.TryGetLastBytes(key, out var bytes))
{
return new[] { Encoding.UTF8.GetString(bytes) };
}
return Enumerable.Empty<string>();
}
private static void MessageHeaderValueSetter(IKafkaMessage msg, string key, string val)
{
msg.Headers?.Remove(key);
msg.Headers?.Add(key, Encoding.UTF8.GetBytes(val));
}
}

View File

@ -96,6 +96,7 @@ public class KafkaFixture : IAsyncLifetime
.WithName(KafkaContainerName)
.WithPortBinding(KafkaPort)
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_AUTO_CREATE_TOPICS_ENABLE", "false")
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", $"{zookeeperContainerName}:{ZookeeperClientPort}")
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://{KafkaContainerName}:29092,PLAINTEXT_HOST://localhost:{KafkaPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT")

View File

@ -14,7 +14,6 @@
// limitations under the License.
// </copyright>
using Google.Protobuf.Collections;
using IntegrationTests.Helpers;
using OpenTelemetry.Proto.Common.V1;
using OpenTelemetry.Proto.Trace.V1;
@ -25,6 +24,10 @@ namespace IntegrationTests;
[Collection(KafkaCollection.Name)]
public class KafkaTests : TestHelper
{
private const string KafkaInstrumentationScopeName = "OpenTelemetry.AutoInstrumentation.Kafka";
// https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Offset.cs#L36C44-L36C44
private const int InvalidOffset = -1001;
public KafkaTests(ITestOutputHelper testOutputHelper)
: base("Kafka", testOutputHelper)
{
@ -41,12 +44,32 @@ public class KafkaTests : TestHelper
using var collector = new MockSpansCollector(Output);
SetExporter(collector);
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1), "Produced without delivery handler.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, 0), "Produced with delivery handler.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1, true), "Produced a tombstone.");
// Failed produce attempts made before topic is created.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateResultProcessingProduceExceptionSpan(span, topicName), "Failed Produce attempt with delivery handler set.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProduceExceptionSpan(span, topicName), "Failed Produce attempt without delivery handler set.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateResultProcessingProduceExceptionSpan(span, topicName), "Failed ProduceAsync attempt.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 0), "Consumed a first message.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 1), "Consumed a second message.");
// For 1.4.0 null is returned when attempting to read from non-existent topic,
// and no exception is thrown,
// instrumentation creates no span in such case.
if (packageVersion == string.Empty || Version.Parse(packageVersion) >= new Version(2, 3, 0))
{
// Failed consume attempt.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumeExceptionSpan(span, topicName), "Failed Consume attempt.");
}
// Successful produce attempts after topic was created with admin client.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, 0), "Successful ProduceAsync attempt.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1), "Successful Produce attempt without delivery handler set.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, 0), "Successful Produce attempt with delivery handler set.");
// Successful produce attempt after topic was created for tombstones.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1, true), "Successful sync Publish attempt with a tombstone.");
// Successful consume attempts.
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 0), "First successful Consume attempt.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 1), "Second successful Consume attempt.");
collector.Expect(KafkaInstrumentationScopeName, span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 2), "Third successful Consume attempt.");
collector.ExpectCollected(collection => ValidatePropagation(collection, topicName));
@ -61,37 +84,64 @@ public class KafkaTests : TestHelper
collector.AssertExpectations();
}
private static bool ValidateConsumerSpan(Span span, string topicName, int messageOffset)
private static bool ValidateConsumeExceptionSpan(Span span, string topicName)
{
return ValidateConsumerSpan(span, topicName, InvalidOffset, null) &&
span.Status.Code == Status.Types.StatusCode.Error;
}
private static bool ValidateConsumerSpan(Span span, string topicName, int messageOffset, string? expectedMessageKey = "testkey")
{
var kafkaMessageOffset = span.Attributes.Single(kv => kv.Key == "messaging.kafka.message.offset").Value.IntValue;
var consumerGroupId = span.Attributes.Single(kv => kv.Key == "messaging.kafka.consumer.group").Value.StringValue;
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#consumer-2", "receive", 0) &&
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#consumer-2", "receive", 0, expectedMessageKey) &&
kafkaMessageOffset == messageOffset &&
consumerGroupId == $"test-consumer-group-{topicName}";
}
private static bool ValidateBasicProduceExceptionSpanExpectations(Span span, string topicName)
{
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#producer-1", "publish", -1, "testkey") &&
span.Status.Code == Status.Types.StatusCode.Error;
}
private static bool ValidateProduceExceptionSpan(Span span, string topicName)
{
return ValidateBasicProduceExceptionSpanExpectations(span, topicName) &&
span.Attributes.Count(kv => kv.Key == "messaging.kafka.message.offset") == 0;
}
private static bool ValidateResultProcessingProduceExceptionSpan(Span span, string topicName)
{
// DeliveryResult processing results in offset being set.
var offset = span.Attributes.SingleOrDefault(kv => kv.Key == "messaging.kafka.message.offset")?.Value.IntValue;
return ValidateBasicProduceExceptionSpanExpectations(span, topicName) &&
offset == InvalidOffset;
}
private static bool ValidateProducerSpan(Span span, string topicName, int partition, bool tombstoneExpected = false)
{
var isTombstone = span.Attributes.Single(kv => kv.Key == "messaging.kafka.message.tombstone").Value.BoolValue;
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#producer-1", "publish", partition) &&
isTombstone == tombstoneExpected;
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#producer-1", "publish", partition, "testkey") &&
isTombstone == tombstoneExpected &&
span.Status is null;
}
private static bool ValidateCommonTags(IReadOnlyCollection<KeyValue> attributes, string topicName, string clientName, string operationName, int partition)
private static bool ValidateCommonTags(IReadOnlyCollection<KeyValue> attributes, string topicName, string clientName, string operationName, int partition, string? expectedMessageKey)
{
var messagingSystem = attributes.Single(kv => kv.Key == "messaging.system").Value.StringValue;
var messagingDestinationName = attributes.Single(kv => kv.Key == "messaging.destination.name").Value.StringValue;
var messagingOperation = attributes.Single(kv => kv.Key == "messaging.operation").Value.StringValue;
var messagingClientId = attributes.Single(kv => kv.Key == "messaging.client_id").Value.StringValue;
var kafkaMessageKey = attributes.Single(kv => kv.Key == "messaging.kafka.message.key").Value.StringValue;
var kafkaMessageKey = attributes.SingleOrDefault(kv => kv.Key == "messaging.kafka.message.key")?.Value.StringValue;
var kafkaPartition = attributes.Single(kv => kv.Key == "messaging.kafka.destination.partition").Value.IntValue;
return messagingSystem == "kafka" &&
messagingDestinationName == topicName &&
messagingOperation == operationName &&
messagingClientId == clientName &&
kafkaMessageKey == "testkey" &&
kafkaMessageKey == expectedMessageKey &&
kafkaPartition == partition;
}
@ -102,14 +152,14 @@ public class KafkaTests : TestHelper
var producerSpans = collectedSpans
.Where(span =>
span.Span.Name == expectedPublishOperationName &&
!span.Span.Attributes.Single(attr => attr.Key == "messaging.kafka.message.tombstone").Value.BoolValue)
.ToList();
var firstProducerSpan = producerSpans[0].Span;
var firstConsumerSpan = GetMatchingConsumerSpan(collectedSpans, firstProducerSpan, expectedReceiveOperationName);
var secondProducerSpan = producerSpans[1].Span;
var secondConsumerSpan = GetMatchingConsumerSpan(collectedSpans, secondProducerSpan, expectedReceiveOperationName);
!span.Span.Attributes.Single(attr => attr.Key == "messaging.kafka.message.tombstone").Value.BoolValue &&
span.Span.Status is null);
return firstConsumerSpan is not null && secondConsumerSpan is not null;
return producerSpans
.Select(producerSpan =>
GetMatchingConsumerSpan(collectedSpans, producerSpan.Span, expectedReceiveOperationName))
.All(matchingConsumerSpan =>
matchingConsumerSpan is not null);
}
private static MockSpansCollector.Collected? GetMatchingConsumerSpan(ICollection<MockSpansCollector.Collected> collectedSpans, Span producerSpan, string expectedReceiveOperationName)

View File

@ -15,58 +15,195 @@
// </copyright>
using Confluent.Kafka;
using Confluent.Kafka.Admin;
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
var topicName = args[0];
using var p = new ProducerBuilder<string, string>(config).Build();
try
namespace TestApplication.Kafka;
internal static class Program
{
// produce a message without a delivery handler set
p.Produce(topicName, new Message<string, string> { Key = "testkey", Value = "test" });
// produce a message and set a delivery handler
p.Produce(topicName, new Message<string, string> { Key = "testkey", Value = "test" }, report =>
private const string MessageKey = "testkey";
private const string BootstrapServers = "localhost:9092";
public static async Task<int> Main(string[] args)
{
Console.WriteLine($"Finished sending msg, offset: {report.Offset.Value}.");
});
p.Flush();
Console.WriteLine("Delivered messages.");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}.");
}
var topicName = args[0];
var conf = new ConsumerConfig
{
GroupId = $"test-consumer-group-{topicName}",
BootstrapServers = "localhost:9092",
// https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};
using var waitEvent = new ManualResetEventSlim();
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
consumer.Subscribe(topicName);
using var producer = BuildProducer(BootstrapServers);
try
{
ConsumeMessage(consumer);
ConsumeMessage(consumer);
// Attempts are made to produce messages to non-existent topic.
// Intention is to verify exception handling logic
// in ProduceProducerSyncIntegration, ProduceProducerAsyncIntegration
// and ProducerDeliveryHandlerActionIntegration.
// Note: order of calls seems to be important to test
// exception handling in all of the mentioned classes.
TryProduceSyncWithDeliveryHandler(producer, topicName, report =>
{
Console.WriteLine(
$"Delivery report received, message offset: {report.Offset.Value}, error: {report.Error.IsError}.");
waitEvent.Set();
});
await TryProduceAsync(producer, topicName);
TryProduceSync(producer, topicName);
using var consumer = BuildConsumer(topicName, BootstrapServers);
consumer.Subscribe(topicName);
TryConsumeMessage(consumer);
Console.WriteLine("Waiting on delivery handler.");
if (!waitEvent.Wait(TimeSpan.FromSeconds(10)))
{
Console.WriteLine("Timed-out waiting for delivery handler to complete.");
return 1;
}
Console.WriteLine("Delivery handler completed.");
await CreateTopic(BootstrapServers, topicName);
Console.WriteLine("Topic creation completed.");
using var cts = new CancellationTokenSource(TimeSpan.FromMinutes(1));
// Required as first few Produce attempts may still fail
// with "unknown topic" error
// after topic creation completed.
await WaitForSuccessfulProduceAsync(producer, topicName, cts.Token);
producer.Produce(topicName, CreateTestMessage());
producer.Produce(
topicName,
CreateTestMessage(),
report =>
{
Console.WriteLine(
$"Delivery report received, message offset: {report.Offset.Value}, error: {report.Error.IsError}.");
});
producer.Flush(cts.Token);
// Consume all the produced messages.
consumer.Consume(cts.Token);
consumer.Consume(cts.Token);
consumer.Consume(cts.Token);
// Produce a tombstone.
producer.Produce(topicName, new Message<string, string> { Key = MessageKey, Value = null! });
return 0;
}
catch (ConsumeException e)
private static void TryProduceSync(IProducer<string, string> producer, string topicName)
{
Console.WriteLine($"Error occured: {e.Error.Reason}.");
try
{
producer.Produce(topicName, CreateTestMessage());
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Produce sync without delivery handler exception: {e.Error.Reason}");
}
}
private static void TryConsumeMessage(IConsumer<string, string> consumer)
{
try
{
var cr = consumer.Consume(TimeSpan.FromSeconds(5));
Console.WriteLine($"Consumption succeeded, message received: {cr?.Message}");
}
catch (ConsumeException ex)
{
Console.WriteLine($"Consumption exception: {ex.Error.Reason}");
}
}
private static async Task WaitForSuccessfulProduceAsync(IProducer<string, string> producer, string topicName, CancellationToken token)
{
while (true)
{
token.ThrowIfCancellationRequested();
try
{
await producer.ProduceAsync(topicName, CreateTestMessage(), token);
return;
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"ProduceAsync exception: {ex.Error.Reason}");
}
await Task.Delay(TimeSpan.FromSeconds(1), token);
}
}
private static IProducer<string, string> BuildProducer(string bootstrapServers)
{
var config = new ProducerConfig
{
BootstrapServers = bootstrapServers
};
return new ProducerBuilder<string, string>(config).Build();
}
private static IConsumer<string, string> BuildConsumer(string topicName, string bootstrapServers)
{
var conf = new ConsumerConfig
{
GroupId = $"test-consumer-group-{topicName}",
BootstrapServers = bootstrapServers,
// https://github.com/confluentinc/confluent-kafka-dotnet/tree/07de95ed647af80a0db39ce6a8891a630423b952#basic-consumer-example
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};
return new ConsumerBuilder<string, string>(conf).Build();
}
private static async Task TryProduceAsync(
IProducer<string, string> producer,
string topicName)
{
try
{
await producer.ProduceAsync(topicName, CreateTestMessage());
}
catch (ProduceException<string, string> ex)
{
Console.WriteLine($"ProduceAsync exception: {ex.Error.Reason}");
}
}
private static Message<string, string> CreateTestMessage()
{
return new Message<string, string> { Key = MessageKey, Value = "test" };
}
private static void TryProduceSyncWithDeliveryHandler(IProducer<string, string> producer, string topic, Action<DeliveryReport<string, string>> deliveryHandler)
{
try
{
producer.Produce(
topic,
CreateTestMessage(),
deliveryHandler);
}
catch (ProduceException<string, string> e)
{
Console.WriteLine($"Produce sync with delivery handler exception: {e.Error.Reason}");
}
}
private static async Task CreateTopic(string bootstrapServers, string topic)
{
var adminClientConfig = new AdminClientConfig
{
BootstrapServers = bootstrapServers
};
using var adminClient = new AdminClientBuilder(adminClientConfig).Build();
await adminClient.CreateTopicsAsync(new[]
{
new TopicSpecification { Name = topic, ReplicationFactor = 1, NumPartitions = 1 }
});
}
}
// produce a tombstone
p.Produce(topicName, new Message<string, string> { Key = "testkey", Value = null! });
return;
void ConsumeMessage(IConsumer<string, string> consumer)
{
var cr = consumer.Consume(TimeSpan.FromSeconds(10));
Console.WriteLine($"Consumed message '{cr?.Message?.Value}' at: '{cr?.TopicPartitionOffset}'.");
}