[kafka] bytecode instrumentation - sync (#3055)

This commit is contained in:
Mateusz Łach 2023-11-22 11:40:00 +01:00 committed by GitHub
parent 58d949cd37
commit a894ff70e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
45 changed files with 1536 additions and 19 deletions

View File

@ -31,6 +31,7 @@ This component adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.h
- Added support for [System.Data.SqlClient](https://www.nuget.org/packages/System.Data.SqlClient/)
(NuGet package) traces instrumentation from `4.8.5`.
- Ability to update installation via PS module (`OpenTelemetry.DotNet.Auto.psm1`).
- Added support for `KAFKA` traces instrumentation.
### Changed

View File

@ -201,7 +201,9 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestApplication.Wcf.Client.
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestApplication.MinimalApi", "test\test-applications\integrations\TestApplication.MinimalApi\TestApplication.MinimalApi.csproj", "{803A3DD1-016E-4713-8066-A1C81A6ADBA3}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestApplication.Worker", "test\test-applications\integrations\TestApplication.Worker\TestApplication.Worker.csproj", "{E04065C2-0512-41C6-A428-AC85342B3D03}"
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestApplication.Kafka", "test\test-applications\integrations\TestApplication.Kafka\TestApplication.Kafka.csproj", "{A7551BDB-AD27-4E80-8B13-67C42F83554C}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "TestApplication.Worker", "test\test-applications\integrations\TestApplication.Worker\TestApplication.Worker.csproj", "{AE2AAA2D-A214-4381-ACF5-D57980097873}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestApplication.SqlClient.System.NetFramework", "test\test-applications\integrations\TestApplication.SqlClient.System.NetFramework\TestApplication.SqlClient.System.NetFramework.csproj", "{D6720242-70E4-4C62-95BE-AA11944AE0DE}"
EndProject
@ -957,18 +959,30 @@ Global
{803A3DD1-016E-4713-8066-A1C81A6ADBA3}.Release|x64.Build.0 = Release|x64
{803A3DD1-016E-4713-8066-A1C81A6ADBA3}.Release|x86.ActiveCfg = Release|x86
{803A3DD1-016E-4713-8066-A1C81A6ADBA3}.Release|x86.Build.0 = Release|x86
{E04065C2-0512-41C6-A428-AC85342B3D03}.Debug|Any CPU.ActiveCfg = Debug|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Debug|Any CPU.Build.0 = Debug|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Debug|x64.ActiveCfg = Debug|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Debug|x64.Build.0 = Debug|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Debug|x86.ActiveCfg = Debug|x86
{E04065C2-0512-41C6-A428-AC85342B3D03}.Debug|x86.Build.0 = Debug|x86
{E04065C2-0512-41C6-A428-AC85342B3D03}.Release|Any CPU.ActiveCfg = Release|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Release|Any CPU.Build.0 = Release|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Release|x64.ActiveCfg = Release|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Release|x64.Build.0 = Release|x64
{E04065C2-0512-41C6-A428-AC85342B3D03}.Release|x86.ActiveCfg = Release|x86
{E04065C2-0512-41C6-A428-AC85342B3D03}.Release|x86.Build.0 = Release|x86
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Debug|Any CPU.ActiveCfg = Debug|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Debug|Any CPU.Build.0 = Debug|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Debug|x64.ActiveCfg = Debug|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Debug|x64.Build.0 = Debug|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Debug|x86.ActiveCfg = Debug|x86
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Debug|x86.Build.0 = Debug|x86
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Release|Any CPU.ActiveCfg = Release|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Release|Any CPU.Build.0 = Release|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Release|x64.ActiveCfg = Release|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Release|x64.Build.0 = Release|x64
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Release|x86.ActiveCfg = Release|x86
{A7551BDB-AD27-4E80-8B13-67C42F83554C}.Release|x86.Build.0 = Release|x86
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Debug|Any CPU.ActiveCfg = Debug|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Debug|Any CPU.Build.0 = Debug|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Debug|x64.ActiveCfg = Debug|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Debug|x64.Build.0 = Debug|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Debug|x86.ActiveCfg = Debug|x86
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Debug|x86.Build.0 = Debug|x86
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Release|Any CPU.ActiveCfg = Release|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Release|Any CPU.Build.0 = Release|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Release|x64.ActiveCfg = Release|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Release|x64.Build.0 = Release|x64
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Release|x86.ActiveCfg = Release|x86
{AE2AAA2D-A214-4381-ACF5-D57980097873}.Release|x86.Build.0 = Release|x86
{D6720242-70E4-4C62-95BE-AA11944AE0DE}.Debug|Any CPU.ActiveCfg = Debug|x64
{D6720242-70E4-4C62-95BE-AA11944AE0DE}.Debug|Any CPU.Build.0 = Debug|x64
{D6720242-70E4-4C62-95BE-AA11944AE0DE}.Debug|x64.ActiveCfg = Debug|x64
@ -1077,7 +1091,8 @@ Global
{3A125210-A784-4982-ACDB-C3442E414E44} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{EDE168E0-DBCD-4DE3-B55A-4B633ED6565E} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{803A3DD1-016E-4713-8066-A1C81A6ADBA3} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{E04065C2-0512-41C6-A428-AC85342B3D03} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{A7551BDB-AD27-4E80-8B13-67C42F83554C} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{AE2AAA2D-A214-4381-ACF5-D57980097873} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{D6720242-70E4-4C62-95BE-AA11944AE0DE} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{3E53C72E-7711-4BD2-942C-4CEA895D2F98} = {E409ADD3-9574-465C-AB09-4324D205CC7C}
{C8DD1858-9C26-419F-9DE2-8E06F988EC66} = {E409ADD3-9574-465C-AB09-4324D205CC7C}

View File

@ -157,5 +157,13 @@ public static class LibraryVersion
new("6.2.0"),
}
},
{
"TestApplication.Kafka",
new List<PackageBuildInfo>
{
new("1.4.0"),
new("2.3.0"),
}
},
};
}

View File

@ -135,13 +135,14 @@ due to lack of stable semantic convention.
| `GRAPHQL` | [GraphQL](https://www.nuget.org/packages/GraphQL) **Not supported on .NET Framework** | ≥7.5.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `GRPCNETCLIENT` | [Grpc.Net.Client](https://www.nuget.org/packages/Grpc.Net.Client) | ≥2.52.0 & < 3.0.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `HTTPCLIENT` | [System.Net.Http.HttpClient](https://docs.microsoft.com/dotnet/api/system.net.http.httpclient) and [System.Net.HttpWebRequest](https://docs.microsoft.com/dotnet/api/system.net.httpwebrequest) | * | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `QUARTZ` | [Quartz](https://www.nuget.org/packages/Quartz) **Not supported on .NET Framework 4.7.1 and older** | ≥3.4.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `KAFKA` | [Confluent.Kafka](https://www.nuget.org/packages/Confluent.Kafka) | ≥1.4.0 < 3.0.0 | bytecode | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `MASSTRANSIT` | [MassTransit](https://www.nuget.org/packages/MassTransit) **Not supported on .NET Framework** | ≥8.0.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `MONGODB` | [MongoDB.Driver.Core](https://www.nuget.org/packages/MongoDB.Driver.Core) | ≥2.13.3 & < 3.0.0 | source & bytecode | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `MYSQLCONNECTOR` | [MySqlConnector](https://www.nuget.org/packages/MySqlConnector) | ≥2.0.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `MYSQLDATA` | [MySql.Data](https://www.nuget.org/packages/MySql.Data) **Not supported on .NET Framework** | ≥8.1.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `NPGSQL` | [Npgsql](https://www.nuget.org/packages/Npgsql) | ≥6.0.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `NSERVICEBUS` | [NServiceBus](https://www.nuget.org/packages/NServiceBus) | ≥8.0.0 | source & bytecode | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `QUARTZ` | [Quartz](https://www.nuget.org/packages/Quartz) **Not supported on .NET Framework 4.7.1 and older** | ≥3.4.0 | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `SQLCLIENT` | [Microsoft.Data.SqlClient](https://www.nuget.org/packages/Microsoft.Data.SqlClient), [System.Data.SqlClient](https://www.nuget.org/packages/System.Data.SqlClient) and `System.Data` (shipped with .NET Framework) | * \[4\] | source | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `STACKEXCHANGEREDIS` | [StackExchange.Redis](https://www.nuget.org/packages/StackExchange.Redis) **Not supported on .NET Framework** | ≥2.0.405 < 3.0.0 | source & bytecode | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |
| `WCFCLIENT` | WCF | * | source & bytecode | [Experimental](https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/versioning-and-stability.md) |

View File

@ -0,0 +1,11 @@
OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object? state) -> void
OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object? state, System.DateTimeOffset? startTime) -> void
OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.StartTime.get -> System.DateTimeOffset?
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerCloseIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConstructorIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object! state) -> void
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity! activity, object! state, System.DateTimeOffset? startTime) -> void

View File

@ -0,0 +1,11 @@
OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object? state) -> void
OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object? state, System.DateTimeOffset? startTime) -> void
OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.StartTime.get -> System.DateTimeOffset?
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerCloseIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConstructorIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration
OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity? activity, object! state) -> void
*REMOVED*OpenTelemetry.AutoInstrumentation.CallTarget.CallTargetState.CallTargetState(System.Diagnostics.Activity! activity, object! state, System.DateTimeOffset? startTime) -> void

View File

@ -27,6 +27,7 @@ public readonly struct CallTargetState
private readonly Activity? _previousActivity;
private readonly Activity? _activity;
private readonly object? _state;
private readonly DateTimeOffset? _startTime;
/// <summary>
/// Initializes a new instance of the <see cref="CallTargetState"/> struct.
@ -37,6 +38,7 @@ public readonly struct CallTargetState
_previousActivity = null;
_activity = activity;
_state = null;
_startTime = null;
}
/// <summary>
@ -44,11 +46,12 @@ public readonly struct CallTargetState
/// </summary>
/// <param name="activity">Activity instance</param>
/// <param name="state">Object state instance</param>
public CallTargetState(Activity? activity, object state)
public CallTargetState(Activity? activity, object? state)
{
_previousActivity = null;
_activity = activity;
_state = state;
_startTime = null;
}
/// <summary>
@ -57,11 +60,12 @@ public readonly struct CallTargetState
/// <param name="activity">Activity instance</param>
/// <param name="state">Object state instance</param>
/// <param name="startTime">The intended start time of the activity, intended for activities created in the OnMethodEnd handler</param>
public CallTargetState(Activity activity, object state, DateTimeOffset? startTime)
public CallTargetState(Activity? activity, object? state, DateTimeOffset? startTime)
{
_previousActivity = null;
_activity = activity;
_state = state;
_startTime = startTime;
}
internal CallTargetState(Activity? previousActivity, CallTargetState state)
@ -69,6 +73,7 @@ public readonly struct CallTargetState
_previousActivity = previousActivity;
_activity = state._activity;
_state = state._state;
_startTime = state.StartTime;
}
/// <summary>
@ -81,6 +86,11 @@ public readonly struct CallTargetState
/// </summary>
public object? State => _state;
/// <summary>
/// Gets the start time.
/// </summary>
public DateTimeOffset? StartTime => _startTime;
internal Activity? PreviousActivity => _previousActivity;
/// <summary>

View File

@ -131,4 +131,9 @@ internal enum TracerInstrumentation
/// Elastic.Transport instrumentation.
/// </summary>
ElasticTransport = 19,
/// <summary>
/// Kafka client instrumentation
/// </summary>
Kafka = 20
}

View File

@ -19,7 +19,7 @@ internal static partial class InstrumentationDefinitions
private static NativeCallTargetDefinition[] GetDefinitionsArray()
{
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(9);
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(15);
// Traces
var tracerSettings = Instrumentation.TracerSettings.Value;
if (tracerSettings.TracesEnabled)
@ -30,6 +30,17 @@ internal static partial class InstrumentationDefinitions
nativeCallTargetDefinitions.Add(new("System.Web", "System.Web.Compilation.BuildManager", "InvokePreStartInitMethodsCore", new[] {"System.Void", "System.Collections.Generic.ICollection`1[System.Reflection.MethodInfo]", "System.Func`1[System.IDisposable]"}, 4, 0, 0, 4, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.AspNet.HttpModuleIntegration"));
}
// Kafka
if (tracerSettings.EnabledInstrumentations.Contains(TracerInstrumentation.Kafka))
{
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Close", new[] {"System.Void"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerCloseIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", ".ctor", new[] {"System.Void", "Confluent.Kafka.ConsumerBuilder`2[!0,!1]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConstructorIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Consume", new[] {"Confluent.Kafka.ConsumeResult`2[!0,!1]", "System.Int32"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Dispose", new[] {"System.Void"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2+TypedDeliveryHandlerShim_Action", ".ctor", new[] {"System.Void", "System.String", "!0", "!1", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2", "Produce", new[] {"System.Void", "Confluent.Kafka.TopicPartition", "Confluent.Kafka.Message`2[!0,!1]", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration"));
}
// MongoDB
if (tracerSettings.EnabledInstrumentations.Contains(TracerInstrumentation.MongoDB))
{

View File

@ -19,11 +19,22 @@ internal static partial class InstrumentationDefinitions
private static NativeCallTargetDefinition[] GetDefinitionsArray()
{
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(17);
var nativeCallTargetDefinitions = new List<NativeCallTargetDefinition>(23);
// Traces
var tracerSettings = Instrumentation.TracerSettings.Value;
if (tracerSettings.TracesEnabled)
{
// Kafka
if (tracerSettings.EnabledInstrumentations.Contains(TracerInstrumentation.Kafka))
{
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Close", new[] {"System.Void"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerCloseIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", ".ctor", new[] {"System.Void", "Confluent.Kafka.ConsumerBuilder`2[!0,!1]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConstructorIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Consume", new[] {"Confluent.Kafka.ConsumeResult`2[!0,!1]", "System.Int32"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerConsumeSyncIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Consumer`2", "Dispose", new[] {"System.Void"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ConsumerDisposeIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2+TypedDeliveryHandlerShim_Action", ".ctor", new[] {"System.Void", "System.String", "!0", "!1", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerDeliveryHandlerActionIntegration"));
nativeCallTargetDefinitions.Add(new("Confluent.Kafka", "Confluent.Kafka.Producer`2", "Produce", new[] {"System.Void", "Confluent.Kafka.TopicPartition", "Confluent.Kafka.Message`2[!0,!1]", "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]"}, 1, 4, 0, 2, 65535, 65535, AssemblyFullName, "OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations.ProducerProduceSyncIntegration"));
}
// MongoDB
if (tracerSettings.EnabledInstrumentations.Contains(TracerInstrumentation.MongoDB))
{

View File

@ -323,6 +323,8 @@ internal static class Instrumentation
break;
case TracerInstrumentation.MySqlConnector:
break;
case TracerInstrumentation.Kafka:
break;
default:
Logger.Warning($"Configured trace instrumentation type is not supported: {instrumentation}");
if (FailFastSettings.Value.FailFast)

View File

@ -0,0 +1,43 @@
// <copyright file="ConsumerCache.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Runtime.CompilerServices;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
internal static class ConsumerCache
{
private static readonly ConditionalWeakTable<object, string> ConsumerGroupMap = new();
public static void Add(object consumer, string groupId)
{
// from the docs:
// If the key does not exist in the table, the method invokes a callback method to create a value that is bound to the specified key.
// AddOrUpdate not available for all of the targets
ConsumerGroupMap.GetValue(consumer, createValueCallback: _ => groupId);
}
public static bool TryGet(object consumer, out string? groupId)
{
groupId = null;
return ConsumerGroupMap.TryGetValue(consumer, out groupId);
}
public static void Remove(object instance)
{
ConsumerGroupMap.Remove(instance);
}
}

View File

@ -0,0 +1,29 @@
// <copyright file="IConsumeResult.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/ConsumeResult.cs
internal interface IConsumeResult
{
public IKafkaMessage? Message { get; }
public string? Topic { get; set; }
public Offset Offset { get; set; }
public Partition Partition { get; set; }
}

View File

@ -0,0 +1,23 @@
// <copyright file="IConsumerBuilder.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/D/src/Confluent.Kafka/ConsumerBuilder.cs
internal interface IConsumerBuilder
{
public IEnumerable<KeyValuePair<string, string>>? Config { get; set; }
}

View File

@ -0,0 +1,26 @@
// <copyright file="IDeliveryHandlerActionShim.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.DuckTyping;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Producer.cs#L997
internal interface IDeliveryHandlerActionShim
{
[DuckField]
public object Handler { set; }
}

View File

@ -0,0 +1,27 @@
// <copyright file="IDeliveryReport.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/DeliveryReport.cs
internal interface IDeliveryReport
{
IError? Error { get; set; }
public Partition Partition { get; set; }
public Offset Offset { get; set; }
}

View File

@ -0,0 +1,25 @@
// <copyright file="IError.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Error.cs
internal interface IError
{
public bool IsError { get; }
public string ToString();
}

View File

@ -0,0 +1,27 @@
// <copyright file="IHeaders.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Headers.cs
internal interface IHeaders
{
public void Add(string name, byte[] value);
public void Remove(string name);
public bool TryGetLastBytes(string key, out byte[] lastHeader);
}

View File

@ -0,0 +1,27 @@
// <copyright file="IKafkaMessage.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Message.cs
internal interface IKafkaMessage
{
public object? Key { get; }
public object? Value { get; set; }
public IHeaders? Headers { get; set; }
}

View File

@ -0,0 +1,23 @@
// <copyright file="INamedClient.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/IClient.cs
internal interface INamedClient
{
public string Name { get; }
}

View File

@ -0,0 +1,23 @@
// <copyright file="IResultException.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/ConsumeException.cs
internal interface IResultException
{
public IConsumeResult? ConsumerRecord { get; set; }
}

View File

@ -0,0 +1,25 @@
// <copyright file="ITopicPartition.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/TopicPartition.cs
internal interface ITopicPartition
{
string? Topic { get; }
Partition Partition { get; }
}

View File

@ -0,0 +1,27 @@
// <copyright file="Offset.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.DuckTyping;
#pragma warning disable CS0649 // Field is never assigned to, and will always have its default value
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Offset.cs
[DuckCopy]
internal struct Offset
{
public long Value;
}

View File

@ -0,0 +1,27 @@
// <copyright file="Partition.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.DuckTyping;
#pragma warning disable CS0649 // Field is never assigned to, and will always have its default value
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// wraps https://github.com/confluentinc/confluent-kafka-dotnet/blob/07de95ed647af80a0db39ce6a8891a630423b952/src/Confluent.Kafka/Partition.cs
[DuckCopy]
internal struct Partition
{
public int Value;
}

View File

@ -0,0 +1,37 @@
// <copyright file="IntegrationConstants.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
internal static class IntegrationConstants
{
public const string IntegrationName = "Kafka";
public const string MinVersion = "1.4.0";
public const string MaxVersion = "2.*.*";
public const string ConfluentKafkaAssemblyName = "Confluent.Kafka";
public const string ProducerTypeName = "Confluent.Kafka.Producer`2";
public const string ProducerDeliveryHandlerShimTypeName = "Confluent.Kafka.Producer`2+TypedDeliveryHandlerShim_Action";
public const string ConsumerTypeName = "Confluent.Kafka.Consumer`2";
public const string ConsumerBuilderTypeName = "Confluent.Kafka.ConsumerBuilder`2[!0,!1]";
public const string ProduceSyncMethodName = "Produce";
public const string ConsumeSyncMethodName = "Consume";
public const string DisposeMethodName = "Dispose";
public const string CloseMethodName = "Close";
public const string TopicPartitionTypeName = "Confluent.Kafka.TopicPartition";
public const string MessageTypeName = "Confluent.Kafka.Message`2[!0,!1]";
public const string ActionOfDeliveryReportTypeName = "System.Action`1[Confluent.Kafka.DeliveryReport`2[!0,!1]]";
public const string ConsumeResultTypeName = "Confluent.Kafka.ConsumeResult`2[!0,!1]";
}

View File

@ -0,0 +1,41 @@
// <copyright file="ConsumerCloseIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.CallTarget;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Kafka consumer close instrumentation
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ConsumerTypeName,
methodName: IntegrationConstants.CloseMethodName,
returnTypeName: ClrNames.Void,
parameterTypeNames: new string[0],
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ConsumerCloseIntegration
{
internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exception? exception, in CallTargetState state)
{
ConsumerCache.Remove(instance!);
return CallTargetReturn.GetDefault();
}
}

View File

@ -0,0 +1,86 @@
// <copyright file="ConsumerConstructorIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.CallTarget;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Kafka consumer ctor instrumentation
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ConsumerTypeName,
methodName: ".ctor",
returnTypeName: ClrNames.Void,
parameterTypeNames: new[] { IntegrationConstants.ConsumerBuilderTypeName },
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ConsumerConstructorIntegration
{
internal static CallTargetState OnMethodBegin<TTarget, TConsumerBuilder>(TTarget instance, TConsumerBuilder consumerBuilder)
where TConsumerBuilder : IConsumerBuilder, IDuckType
{
// duck type created for consumer builder is a struct
if (consumerBuilder.Instance is null)
{
// invalid parameters, exit early
return CallTargetState.GetDefault();
}
string? consumerGroupId = null;
if (consumerBuilder.Config is not null)
{
foreach (var keyValuePair in consumerBuilder.Config)
{
if (string.Equals(
keyValuePair.Key,
KafkaCommon.ConsumerGroupIdConfigKey,
StringComparison.OrdinalIgnoreCase))
{
consumerGroupId = keyValuePair.Value;
break;
}
}
}
// https://github.com/confluentinc/confluent-kafka-dotnet/wiki/Consumer#misc-points states GroupId is required
if (consumerGroupId is not null)
{
// Store the association between consumer instance and "group.id" from configuration,
// will be used to populate "messaging.kafka.consumer.group" attribute value
ConsumerCache.Add(instance!, consumerGroupId);
}
return CallTargetState.GetDefault();
}
internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exception? exception, in CallTargetState state)
{
if (exception is not null)
{
ConsumerCache.Remove(instance!);
}
return CallTargetReturn.GetDefault();
}
}

View File

@ -0,0 +1,105 @@
// <copyright file="ConsumerConsumeSyncIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using OpenTelemetry.AutoInstrumentation.CallTarget;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
using OpenTelemetry.Context.Propagation;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Kafka consumer sync consume instrumentation
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ConsumerTypeName,
methodName: IntegrationConstants.ConsumeSyncMethodName,
returnTypeName: IntegrationConstants.ConsumeResultTypeName,
parameterTypeNames: new[] { ClrNames.Int32 },
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ConsumerConsumeSyncIntegration
{
internal static CallTargetState OnMethodBegin<TTarget>(TTarget instance, int timeout)
{
// Preferably, activity would be started here,
// and link to propagated context later;
// this is currently not possible, so capture start time
// to use it when starting activity to get approximate
// activity duration.
// TODO: accurate on .NET, but not .NET Fx
return new CallTargetState(null, null, DateTime.UtcNow);
}
internal static CallTargetReturn<TResponse> OnMethodEnd<TTarget, TResponse>(TTarget instance, TResponse response, Exception? exception, in CallTargetState state)
{
IConsumeResult? consumeResult;
if (exception is not null && exception.TryDuckCast<IResultException>(out var resultException))
{
consumeResult = resultException.ConsumerRecord;
}
else
{
consumeResult = response == null ? null : response.DuckAs<IConsumeResult>();
}
if (consumeResult is null)
{
return new CallTargetReturn<TResponse>(response);
}
var propagatedContext = Propagators.DefaultTextMapPropagator.Extract(default, consumeResult, KafkaCommon.MessageHeaderValueGetter);
string? spanName = null;
if (!string.IsNullOrEmpty(consumeResult.Topic))
{
spanName = $"{consumeResult.Topic} {MessagingAttributes.Values.ReceiveOperationName}";
}
spanName ??= MessagingAttributes.Values.ReceiveOperationName;
var activityLinks = propagatedContext.ActivityContext.IsValid()
? new[] { new ActivityLink(propagatedContext.ActivityContext) }
: Array.Empty<ActivityLink>();
var startTime = (DateTimeOffset)state.StartTime!;
var activity = KafkaCommon.Source.StartActivity(name: spanName, kind: ActivityKind.Consumer, links: activityLinks, startTime: startTime);
if (activity is { IsAllDataRequested: true })
{
KafkaCommon.SetCommonAttributes(
activity,
MessagingAttributes.Values.ReceiveOperationName,
consumeResult.Topic,
consumeResult.Partition,
consumeResult.Message?.Key,
instance.DuckCast<INamedClient>());
activity.SetTag(MessagingAttributes.Keys.Kafka.PartitionOffset, consumeResult.Offset.Value);
if (ConsumerCache.TryGet(instance!, out var groupId))
{
activity.SetTag(MessagingAttributes.Keys.Kafka.ConsumerGroupId, groupId);
}
}
activity?.Stop();
return new CallTargetReturn<TResponse>(response);
}
}

View File

@ -0,0 +1,41 @@
// <copyright file="ConsumerDisposeIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using OpenTelemetry.AutoInstrumentation.CallTarget;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Kafka consumer dispose instrumentation
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ConsumerTypeName,
methodName: IntegrationConstants.DisposeMethodName,
returnTypeName: ClrNames.Void,
parameterTypeNames: new string[0],
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ConsumerDisposeIntegration
{
internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exception? exception, in CallTargetState state)
{
ConsumerCache.Remove(instance!);
return CallTargetReturn.GetDefault();
}
}

View File

@ -0,0 +1,146 @@
// <copyright file="ProducerDeliveryHandlerActionIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using System.Reflection;
using OpenTelemetry.AutoInstrumentation.CallTarget;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
using OpenTelemetry.AutoInstrumentation.Util;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Kafka sync produce instrumentation
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ProducerDeliveryHandlerShimTypeName,
methodName: ".ctor",
returnTypeName: ClrNames.Void,
parameterTypeNames: new[] { ClrNames.String, "!0", "!1", IntegrationConstants.ActionOfDeliveryReportTypeName },
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ProducerDeliveryHandlerActionIntegration
{
internal static CallTargetState OnMethodBegin<TTarget, TKey, TValue, TActionOfDeliveryReport>(
TTarget instance,
string topic,
TKey key,
TValue value,
TActionOfDeliveryReport? handler)
{
// If handler is not set,
// activity will be stopped at the end
// of Produce method, there is nothing to do
if (handler == null)
{
return CallTargetState.GetDefault();
}
try
{
var currentActivity = Activity.Current;
if (currentActivity is not null)
{
var activityCompletingHandler = WrapperCache<TActionOfDeliveryReport>.Create(handler, currentActivity);
// Store the action to set activity handler as state,
// run at the end of the ctor.
Action<IDeliveryHandlerActionShim> setActivityCompletingHandler = shim => shim.Handler = activityCompletingHandler!;
return new CallTargetState(currentActivity, setActivityCompletingHandler);
}
return CallTargetState.GetDefault();
}
catch (Exception)
{
return CallTargetState.GetDefault();
}
}
internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exception? exception, in CallTargetState state)
{
if (state.State is Action<IDeliveryHandlerActionShim> action && instance.TryDuckCast<IDeliveryHandlerActionShim>(out var inst))
{
try
{
action.Invoke(inst);
}
catch (Exception)
{
state.Activity?.Stop();
}
}
return CallTargetReturn.GetDefault();
}
internal static Action<TDeliveryReport> WrapWithActivityCompletion<TDeliveryReport>(Action<TDeliveryReport> initialCallback, Activity activity)
{
return report =>
{
if (report.TryDuckCast<IDeliveryReport>(out var deliveryReport))
{
if (deliveryReport.Error is { IsError: true })
{
activity.SetException(new Exception(deliveryReport.Error.ToString()));
}
// Set the final partition message was delivered to.
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, deliveryReport.Partition.Value);
activity.SetTag(
MessagingAttributes.Keys.Kafka.PartitionOffset,
deliveryReport.Offset.Value);
}
try
{
initialCallback.Invoke(report);
}
finally
{
activity.Stop();
}
};
}
private static class WrapperCache<TActionOfDeliveryReport>
{
private static readonly CreateWrapperDelegate Delegate;
static WrapperCache()
{
var method =
typeof(ProducerDeliveryHandlerActionIntegration).GetMethod(
nameof(WrapWithActivityCompletion),
BindingFlags.Static | BindingFlags.NonPublic)!;
var constructedMethod = method.MakeGenericMethod(typeof(TActionOfDeliveryReport).GetGenericArguments());
Delegate = (CreateWrapperDelegate)constructedMethod.CreateDelegate(typeof(CreateWrapperDelegate));
}
private delegate TActionOfDeliveryReport CreateWrapperDelegate(TActionOfDeliveryReport action, Activity activity);
public static TActionOfDeliveryReport Create(TActionOfDeliveryReport action, Activity activity)
{
return Delegate(action, activity);
}
}
}

View File

@ -0,0 +1,135 @@
// <copyright file="ProducerProduceSyncIntegration.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using OpenTelemetry.AutoInstrumentation.CallTarget;
using OpenTelemetry.AutoInstrumentation.DuckTyping;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
using OpenTelemetry.AutoInstrumentation.Util;
using OpenTelemetry.Context.Propagation;
// ReSharper disable ConditionIsAlwaysTrueOrFalseAccordingToNullableAPIContract
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.Integrations;
/// <summary>
/// Kafka sync produce instrumentation
/// </summary>
[InstrumentMethod(
assemblyName: IntegrationConstants.ConfluentKafkaAssemblyName,
typeName: IntegrationConstants.ProducerTypeName,
methodName: IntegrationConstants.ProduceSyncMethodName,
returnTypeName: ClrNames.Void,
parameterTypeNames: new[] { IntegrationConstants.TopicPartitionTypeName, IntegrationConstants.MessageTypeName, IntegrationConstants.ActionOfDeliveryReportTypeName },
minimumVersion: IntegrationConstants.MinVersion,
maximumVersion: IntegrationConstants.MaxVersion,
integrationName: IntegrationConstants.IntegrationName,
type: InstrumentationType.Trace)]
public static class ProducerProduceSyncIntegration
{
internal static CallTargetState OnMethodBegin<TTarget, TTopicPartition, TMessage, TDeliveryHandler>(
TTarget instance, TTopicPartition topicPartition, TMessage message, TDeliveryHandler deliveryHandler)
where TMessage : IKafkaMessage, IDuckType
{
// duck type created for message is a struct
if (message.Instance is null || topicPartition is null || !topicPartition.TryDuckCast<ITopicPartition>(out var duckTypedTopicPartition))
{
// invalid parameters, exit early
return CallTargetState.GetDefault();
}
string? spanName = null;
if (!string.IsNullOrEmpty(duckTypedTopicPartition.Topic))
{
spanName = $"{duckTypedTopicPartition.Topic} {MessagingAttributes.Values.PublishOperationName}";
}
spanName ??= MessagingAttributes.Values.PublishOperationName;
var activity = KafkaCommon.Source.StartActivity(name: spanName, ActivityKind.Producer);
if (activity is not null)
{
message.Headers ??= MessageHeadersHelper<TTopicPartition>.Create();
Propagators.DefaultTextMapPropagator.Inject<IKafkaMessage>(
new PropagationContext(activity.Context, Baggage.Current),
message,
KafkaCommon.MessageHeaderValueSetter);
if (activity.IsAllDataRequested)
{
KafkaCommon.SetCommonAttributes(
activity,
MessagingAttributes.Values.PublishOperationName,
duckTypedTopicPartition.Topic,
duckTypedTopicPartition.Partition,
message.Key,
instance.DuckCast<INamedClient>());
activity.SetTag(MessagingAttributes.Keys.Kafka.IsTombstone, message.Value is null);
}
// Store as state information if delivery handler was set
return new CallTargetState(activity, deliveryHandler);
}
return CallTargetState.GetDefault();
}
internal static CallTargetReturn OnMethodEnd<TTarget>(TTarget instance, Exception? exception, in CallTargetState state)
{
var activity = state.Activity;
if (activity is null)
{
return CallTargetReturn.GetDefault();
}
if (exception is not null)
{
activity.SetException(exception);
}
// If delivery handler was not set, stop the activity
if (state.State is null)
{
activity.Stop();
}
else
{
// If delivery handler was set,
// only set parent as a current activity.
// Activity will be stopped inside updated
// delivery handler
var current = Activity.Current;
Activity.Current = current?.Parent;
}
return CallTargetReturn.GetDefault();
}
private static class MessageHeadersHelper<TTypeMarker>
{
// ReSharper disable once StaticMemberInGenericType
private static readonly Type HeadersType;
static MessageHeadersHelper()
{
HeadersType = typeof(TTypeMarker).Assembly.GetType("Confluent.Kafka.Headers")!;
}
public static IHeaders? Create()
{
return Activator.CreateInstance(HeadersType).DuckCast<IHeaders>();
}
}
}

View File

@ -0,0 +1,72 @@
// <copyright file="KafkaCommon.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using System.Text;
using OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka.DuckTypes;
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
internal static class KafkaCommon
{
public const string ConsumerGroupIdConfigKey = "group.id";
public static ActivitySource Source { get; } = new("OpenTelemetry.AutoInstrumentation.Kafka");
public static void MessageHeaderValueSetter(IKafkaMessage msg, string key, string val)
{
msg.Headers?.Remove(key);
msg.Headers?.Add(key, Encoding.UTF8.GetBytes(val));
}
public static IEnumerable<string> MessageHeaderValueGetter(IConsumeResult? message, string key)
{
if (message?.Message?.Headers is not null && message.Message.Headers.TryGetLastBytes(key, out var bytes))
{
return new[] { Encoding.UTF8.GetString(bytes) };
}
return Enumerable.Empty<string>();
}
public static void SetCommonAttributes(
Activity activity,
string operationName,
string? topic,
Partition partition,
object? key,
INamedClient? client)
{
activity.SetTag(MessagingAttributes.Keys.MessagingOperation, operationName);
activity.SetTag(MessagingAttributes.Keys.MessagingSystem, MessagingAttributes.Values.KafkaMessagingSystemName);
if (!string.IsNullOrEmpty(topic))
{
activity.SetTag(MessagingAttributes.Keys.DestinationName, topic);
}
if (client is not null)
{
activity.SetTag(MessagingAttributes.Keys.ClientId, client.Name);
}
if (key is not null)
{
activity.SetTag(MessagingAttributes.Keys.Kafka.MessageKey, key);
}
activity.SetTag(MessagingAttributes.Keys.Kafka.Partition, partition.Value);
}
}

View File

@ -0,0 +1,46 @@
// <copyright file="MessagingAttributes.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.AutoInstrumentation.Instrumentations.Kafka;
// https://github.com/open-telemetry/semantic-conventions/blob/v1.23.0/docs/messaging/messaging-spans.md#messaging-attributes
internal static class MessagingAttributes
{
internal static class Keys
{
public const string MessagingSystem = "messaging.system";
public const string MessagingOperation = "messaging.operation";
public const string DestinationName = "messaging.destination.name";
public const string ClientId = "messaging.client_id";
// https://github.com/open-telemetry/semantic-conventions/blob/v1.23.0/docs/messaging/kafka.md#span-attributes
internal static class Kafka
{
public const string ConsumerGroupId = "messaging.kafka.consumer.group";
public const string Partition = "messaging.kafka.destination.partition";
public const string MessageKey = "messaging.kafka.message.key";
public const string PartitionOffset = "messaging.kafka.message.offset";
public const string IsTombstone = "messaging.kafka.message.tombstone";
}
}
internal static class Values
{
public const string KafkaMessagingSystemName = "kafka";
public const string PublishOperationName = "publish";
public const string ReceiveOperationName = "receive";
}
}

View File

@ -3,6 +3,7 @@
<ItemGroup>
<PackageVersion Include="Azure.Storage.Blobs" Version="12.19.1" />
<PackageVersion Include="Elastic.Clients.Elasticsearch" Version="8.11.0" />
<PackageVersion Include="Confluent.Kafka" Version="2.3.0" />
<PackageVersion Include="FluentAssertions" Version="6.12.0" />
<PackageVersion Include="Google.Protobuf" Version="3.25.1" />
<PackageVersion Include="GraphQL" Version="7.6.1" />

View File

@ -41,6 +41,8 @@
<Content Include="docker\postgres.Dockerfile" CopyToOutputDirectory="PreserveNewest" />
<Content Include="docker\redis.Dockerfile" CopyToOutputDirectory="PreserveNewest" />
<Content Include="docker\sql-server.Dockerfile" CopyToOutputDirectory="PreserveNewest" />
<Content Include="docker\kafka.Dockerfile" CopyToOutputDirectory="PreserveNewest" />
<Content Include="docker\zookeeper.Dockerfile" CopyToOutputDirectory="PreserveNewest" />
<Content Include="xunit.runner.json" CopyToOutputDirectory="PreserveNewest" />
</ItemGroup>

View File

@ -0,0 +1,111 @@
// <copyright file="KafkaCollection.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using DotNet.Testcontainers.Builders;
using DotNet.Testcontainers.Containers;
using DotNet.Testcontainers.Networks;
using static IntegrationTests.Helpers.DockerFileHelper;
namespace IntegrationTests;
[CollectionDefinition(Name)]
public class KafkaCollection : ICollectionFixture<KafkaFixture>
{
public const string Name = nameof(KafkaCollection);
}
/// <summary>
/// Container setup based on https://github.com/confluentinc/kafka-images/blob/83f57e511aead515822334ef28da6872d127c6a2/examples/kafka-single-node/docker-compose.yml
/// </summary>
public class KafkaFixture : IAsyncLifetime
{
private const int KafkaPort = 9092;
private const int ZookeeperClientPort = 2181;
private const string KafkaContainerName = "integration-test-kafka";
private const string TestNetworkName = $"{KafkaContainerName}-network";
private const string ZookeeperContainerName = $"{KafkaContainerName}-zookeeper";
private static readonly string ZooKeeperImage = ReadImageFrom("zookeeper.Dockerfile");
private static readonly string KafkaImage = ReadImageFrom("kafka.Dockerfile");
private IContainer? _kafkaContainer;
private IContainer? _zooKeeperContainer;
private INetwork? _containerNetwork;
public async Task InitializeAsync()
{
_containerNetwork = new NetworkBuilder()
.WithName(TestNetworkName)
.Build();
await _containerNetwork.CreateAsync();
_zooKeeperContainer = await LaunchZookeeper(_containerNetwork);
_kafkaContainer = await LaunchKafkaContainer(_containerNetwork, _zooKeeperContainer);
}
public async Task DisposeAsync()
{
if (_kafkaContainer != null)
{
await _kafkaContainer.DisposeAsync();
}
if (_zooKeeperContainer != null)
{
await _zooKeeperContainer.DisposeAsync();
}
if (_containerNetwork != null)
{
await _containerNetwork.DisposeAsync();
}
}
private static async Task<IContainer?> LaunchZookeeper(INetwork? containerNetwork)
{
var container = new ContainerBuilder()
.WithImage(ZooKeeperImage)
.WithName(ZookeeperContainerName)
.WithEnvironment("ZOOKEEPER_CLIENT_PORT", ZookeeperClientPort.ToString())
.WithEnvironment("ZOOKEEPER_TICK_TIME", "2000")
.WithNetwork(containerNetwork)
.Build();
await container.StartAsync();
return container;
}
private static async Task<IContainer?> LaunchKafkaContainer(
INetwork? containerNetwork,
IContainer? zooKeeperContainer)
{
// returned container name starts with '/'
var zookeeperContainerName = zooKeeperContainer?.Name.Substring(1);
var container = new ContainerBuilder()
.WithImage(KafkaImage)
.WithName(KafkaContainerName)
.WithPortBinding(KafkaPort)
.WithEnvironment("KAFKA_BROKER_ID", "1")
.WithEnvironment("KAFKA_ZOOKEEPER_CONNECT", $"{zookeeperContainerName}:{ZookeeperClientPort}")
.WithEnvironment("KAFKA_ADVERTISED_LISTENERS", $"PLAINTEXT://{KafkaContainerName}:29092,PLAINTEXT_HOST://localhost:{KafkaPort}")
.WithEnvironment("KAFKA_LISTENER_SECURITY_PROTOCOL_MAP", "PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT")
.WithEnvironment("KAFKA_INTER_BROKER_LISTENER_NAME", "PLAINTEXT")
.WithEnvironment("KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR", "1")
.WithNetwork(containerNetwork)
.Build();
await container.StartAsync();
return container;
}
}

View File

@ -0,0 +1,128 @@
// <copyright file="KafkaTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using Google.Protobuf.Collections;
using IntegrationTests.Helpers;
using OpenTelemetry.Proto.Common.V1;
using OpenTelemetry.Proto.Trace.V1;
using Xunit.Abstractions;
namespace IntegrationTests;
[Collection(KafkaCollection.Name)]
public class KafkaTests : TestHelper
{
public KafkaTests(ITestOutputHelper testOutputHelper)
: base("Kafka", testOutputHelper)
{
}
[Theory]
[Trait("Category", "EndToEnd")]
[Trait("Containers", "Linux")]
[MemberData(nameof(LibraryVersion.Kafka), MemberType = typeof(LibraryVersion))]
public void SubmitsTraces(string packageVersion)
{
var topicName = $"test-topic-{packageVersion}";
using var collector = new MockSpansCollector(Output);
SetExporter(collector);
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1), "Produced without delivery handler.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, 0), "Produced with delivery handler.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Producer && ValidateProducerSpan(span, topicName, -1, true), "Produced a tombstone.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 0), "Consumed a first message.");
collector.Expect("OpenTelemetry.AutoInstrumentation.Kafka", span => span.Kind == Span.Types.SpanKind.Consumer && ValidateConsumerSpan(span, topicName, 1), "Consumed a second message.");
collector.ExpectCollected(collection => ValidatePropagation(collection, topicName));
EnableBytecodeInstrumentation();
RunTestApplication(new TestSettings
{
PackageVersion = packageVersion,
Arguments = topicName
});
collector.AssertExpectations();
}
private static bool ValidateConsumerSpan(Span span, string topicName, int messageOffset)
{
var kafkaMessageOffset = span.Attributes.Single(kv => kv.Key == "messaging.kafka.message.offset").Value.IntValue;
var consumerGroupId = span.Attributes.Single(kv => kv.Key == "messaging.kafka.consumer.group").Value.StringValue;
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#consumer-2", "receive", 0) &&
kafkaMessageOffset == messageOffset &&
consumerGroupId == $"test-consumer-group-{topicName}";
}
private static bool ValidateProducerSpan(Span span, string topicName, int partition, bool tombstoneExpected = false)
{
var isTombstone = span.Attributes.Single(kv => kv.Key == "messaging.kafka.message.tombstone").Value.BoolValue;
return ValidateCommonTags(span.Attributes, topicName, "rdkafka#producer-1", "publish", partition) &&
isTombstone == tombstoneExpected;
}
private static bool ValidateCommonTags(IReadOnlyCollection<KeyValue> attributes, string topicName, string clientName, string operationName, int partition)
{
var messagingSystem = attributes.Single(kv => kv.Key == "messaging.system").Value.StringValue;
var messagingDestinationName = attributes.Single(kv => kv.Key == "messaging.destination.name").Value.StringValue;
var messagingOperation = attributes.Single(kv => kv.Key == "messaging.operation").Value.StringValue;
var messagingClientId = attributes.Single(kv => kv.Key == "messaging.client_id").Value.StringValue;
var kafkaMessageKey = attributes.Single(kv => kv.Key == "messaging.kafka.message.key").Value.StringValue;
var kafkaPartition = attributes.Single(kv => kv.Key == "messaging.kafka.destination.partition").Value.IntValue;
return messagingSystem == "kafka" &&
messagingDestinationName == topicName &&
messagingOperation == operationName &&
messagingClientId == clientName &&
kafkaMessageKey == "testkey" &&
kafkaPartition == partition;
}
private static bool ValidatePropagation(ICollection<MockSpansCollector.Collected> collectedSpans, string topicName)
{
var expectedReceiveOperationName = $"{topicName} receive";
var expectedPublishOperationName = $"{topicName} publish";
var producerSpans = collectedSpans
.Where(span =>
span.Span.Name == expectedPublishOperationName &&
!span.Span.Attributes.Single(attr => attr.Key == "messaging.kafka.message.tombstone").Value.BoolValue)
.ToList();
var firstProducerSpan = producerSpans[0].Span;
var firstConsumerSpan = GetMatchingConsumerSpan(collectedSpans, firstProducerSpan, expectedReceiveOperationName);
var secondProducerSpan = producerSpans[1].Span;
var secondConsumerSpan = GetMatchingConsumerSpan(collectedSpans, secondProducerSpan, expectedReceiveOperationName);
return firstConsumerSpan is not null && secondConsumerSpan is not null;
}
private static MockSpansCollector.Collected? GetMatchingConsumerSpan(ICollection<MockSpansCollector.Collected> collectedSpans, Span producerSpan, string expectedReceiveOperationName)
{
return collectedSpans
.SingleOrDefault(span =>
{
var parentLinksCount = span.Span.Links.Count(
link =>
link.TraceId == producerSpan.TraceId &&
link.SpanId == producerSpan.SpanId);
return span.Span.Name == expectedReceiveOperationName &&
parentLinksCount == 1;
});
}
}

View File

@ -170,6 +170,15 @@ public static class LibraryVersion
#else
new object[] { "4.10.2" },
new object[] { "6.2.0" },
#endif
};
public static readonly IReadOnlyCollection<object[]> Kafka = new List<object[]>
{
#if DEFAULT_TEST_PACKAGE_VERSIONS
new object[] { string.Empty }
#else
new object[] { "1.4.0" },
new object[] { "2.3.0" },
#endif
};
}

View File

@ -0,0 +1 @@
FROM confluentinc/cp-kafka:7.5.1

View File

@ -0,0 +1 @@
FROM confluentinc/cp-zookeeper:7.5.1

View File

@ -250,6 +250,7 @@ public class SettingsTests : IDisposable
[InlineData("MYSQLCONNECTOR", TracerInstrumentation.MySqlConnector)]
[InlineData("AZURE", TracerInstrumentation.Azure)]
[InlineData("ELASTICTRANSPORT", TracerInstrumentation.ElasticTransport)]
[InlineData("KAFKA", TracerInstrumentation.Kafka)]
internal void TracerSettings_Instrumentations_SupportedValues(string tracerInstrumentation, TracerInstrumentation expectedTracerInstrumentation)
{
Environment.SetEnvironmentVariable(ConfigurationKeys.Traces.TracesInstrumentationEnabled, "false");

View File

@ -0,0 +1,76 @@
// <copyright file="Program.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using Confluent.Kafka;
var config = new ProducerConfig { BootstrapServers = "localhost:9092" };
var topicName = args[0];
using var p = new ProducerBuilder<string, string>(config).Build();
try
{
// produce a message without a delivery handler set
p.Produce(topicName, new Message<string, string> { Key = "testkey", Value = "test" });
// produce a message and set a delivery handler
p.Produce(topicName, new Message<string, string> { Key = "testkey", Value = "test" }, report =>
{
Console.WriteLine($"Finished sending msg, offset: {report.Offset.Value}.");
});
p.Flush();
Console.WriteLine("Delivered messages.");
}
catch (ProduceException<Null, string> e)
{
Console.WriteLine($"Delivery failed: {e.Error.Reason}.");
}
var conf = new ConsumerConfig
{
GroupId = $"test-consumer-group-{topicName}",
BootstrapServers = "localhost:9092",
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetReset.Earliest,
EnableAutoCommit = true
};
using (var consumer = new ConsumerBuilder<string, string>(conf).Build())
{
consumer.Subscribe(topicName);
try
{
ConsumeMessage(consumer);
ConsumeMessage(consumer);
}
catch (ConsumeException e)
{
Console.WriteLine($"Error occured: {e.Error.Reason}.");
}
}
// produce a tombstone
p.Produce(topicName, new Message<string, string> { Key = "testkey", Value = null! });
return;
void ConsumeMessage(IConsumer<string, string> consumer)
{
var cr = consumer.Consume(TimeSpan.FromSeconds(10));
Console.WriteLine($"Consumed message '{cr?.Message?.Value}' at: '{cr?.TopicPartitionOffset}'.");
}

View File

@ -0,0 +1,24 @@
{
"$schema": "http://json.schemastore.org/launchsettings.json",
"profiles": {
"instrumented": {
"commandName": "Project",
"commandLineArgs": "test-topic",
"environmentVariables": {
"COR_ENABLE_PROFILING": "1",
"COR_PROFILER": "{918728DD-259F-4A6A-AC2B-B85E1B658318}",
"COR_PROFILER_PATH": "$(SolutionDir)bin\\tracer-home\\win-x64\\OpenTelemetry.AutoInstrumentation.Native.dll",
"CORECLR_ENABLE_PROFILING": "1",
"CORECLR_PROFILER": "{918728DD-259F-4A6A-AC2B-B85E1B658318}",
"CORECLR_PROFILER_PATH": "$(SolutionDir)bin\\tracer-home\\win-x64\\OpenTelemetry.AutoInstrumentation.Native.dll",
"DOTNET_ADDITIONAL_DEPS": "$(SolutionDir)bin\\tracer-home\\AdditionalDeps",
"DOTNET_SHARED_STORE": "$(SolutionDir)bin\\tracer-home\\store",
"DOTNET_STARTUP_HOOKS": "$(SolutionDir)bin\\tracer-home\\net\\OpenTelemetry.AutoInstrumentation.StartupHook.dll",
"OTEL_DOTNET_AUTO_HOME": "$(SolutionDir)bin\\tracer-home",
"OTEL_SERVICE_NAME": "TestApplication.Kafka",
"OTEL_LOG_LEVEL": "debug",
"OTEL_DOTNET_AUTO_TRACES_CONSOLE_EXPORTER_ENABLED": "true"
}
}
}
}

View File

@ -0,0 +1,5 @@
<Project Sdk="Microsoft.NET.Sdk">
<ItemGroup>
<PackageReference Include="Confluent.Kafka" VersionOverride="$(LibraryVersion)" />
</ItemGroup>
</Project>

View File

@ -217,6 +217,17 @@ internal static class PackageVersionDefinitions
new("4.10.2"),
new("*")
}
},
new()
{
IntegrationName = "Kafka",
NugetPackageName = "Confluent.Kafka",
TestApplicationName = "TestApplication.Kafka",
Versions = new List<PackageVersion>
{
new("1.4.0"),
new("*")
}
}
};