[otlp] Replace the current trace implementation with the new one (#6003)

This commit is contained in:
Rajkumar Rangaraj 2024-11-27 14:28:38 -08:00 committed by GitHub
parent 7eeddf5f22
commit 84e6afbeba
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 183 additions and 899 deletions

View File

@ -1,445 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using System.Collections.Concurrent;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using Google.Protobuf;
using OpenTelemetry.Internal;
using OpenTelemetry.Proto.Collector.Trace.V1;
using OpenTelemetry.Proto.Common.V1;
using OpenTelemetry.Proto.Resource.V1;
using OpenTelemetry.Proto.Trace.V1;
using OpenTelemetry.Trace;
using OtlpTrace = OpenTelemetry.Proto.Trace.V1;
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
internal static class ActivityExtensions
{
private static readonly ConcurrentBag<ScopeSpans> SpanListPool = new();
internal static void AddBatch(
this ExportTraceServiceRequest request,
SdkLimitOptions sdkLimitOptions,
Resource processResource,
in Batch<Activity> activityBatch)
{
Dictionary<string, ScopeSpans> spansByLibrary = new Dictionary<string, ScopeSpans>();
ResourceSpans resourceSpans = new ResourceSpans
{
Resource = processResource,
};
request.ResourceSpans.Add(resourceSpans);
var maxTags = sdkLimitOptions.AttributeCountLimit ?? int.MaxValue;
foreach (var activity in activityBatch)
{
Span? span = activity.ToOtlpSpan(sdkLimitOptions);
if (span == null)
{
OpenTelemetryProtocolExporterEventSource.Log.CouldNotTranslateActivity(
nameof(ActivityExtensions),
nameof(AddBatch));
continue;
}
var activitySourceName = activity.Source.Name;
if (!spansByLibrary.TryGetValue(activitySourceName, out var scopeSpans))
{
scopeSpans = GetSpanListFromPool(activity.Source, maxTags, sdkLimitOptions.AttributeValueLengthLimit);
spansByLibrary.Add(activitySourceName, scopeSpans);
resourceSpans.ScopeSpans.Add(scopeSpans);
}
scopeSpans.Spans.Add(span);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static void Return(this ExportTraceServiceRequest request)
{
var resourceSpans = request.ResourceSpans.FirstOrDefault();
if (resourceSpans == null)
{
return;
}
foreach (var scopeSpan in resourceSpans.ScopeSpans)
{
scopeSpan.Spans.Clear();
scopeSpan.Scope.Attributes.Clear();
SpanListPool.Add(scopeSpan);
}
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static ScopeSpans GetSpanListFromPool(ActivitySource activitySource, int maxTags, int? attributeValueLengthLimit)
{
if (!SpanListPool.TryTake(out var scopeSpans))
{
scopeSpans = new ScopeSpans
{
Scope = new InstrumentationScope
{
Name = activitySource.Name, // Name is enforced to not be null, but it can be empty.
Version = activitySource.Version ?? string.Empty, // NRE throw by proto
},
};
}
else
{
scopeSpans.Scope.Name = activitySource.Name; // Name is enforced to not be null, but it can be empty.
scopeSpans.Scope.Version = activitySource.Version ?? string.Empty; // NRE throw by proto
}
if (activitySource.Tags != null)
{
var scopeAttributes = scopeSpans.Scope.Attributes;
if (activitySource.Tags is IReadOnlyList<KeyValuePair<string, object?>> activitySourceTagsList)
{
for (int i = 0; i < activitySourceTagsList.Count; i++)
{
if (scopeAttributes.Count < maxTags)
{
OtlpTagWriter.Instance.TryWriteTag(ref scopeAttributes, activitySourceTagsList[i], attributeValueLengthLimit);
}
else
{
scopeSpans.Scope.DroppedAttributesCount++;
}
}
}
else
{
foreach (var tag in activitySource.Tags)
{
if (scopeAttributes.Count < maxTags)
{
OtlpTagWriter.Instance.TryWriteTag(ref scopeAttributes, tag, attributeValueLengthLimit);
}
else
{
scopeSpans.Scope.DroppedAttributesCount++;
}
}
}
}
return scopeSpans;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
internal static Span? ToOtlpSpan(this Activity activity, SdkLimitOptions sdkLimitOptions)
{
if (activity.IdFormat != ActivityIdFormat.W3C)
{
// Only ActivityIdFormat.W3C is supported, in principle this should never be
// hit under the OpenTelemetry SDK.
return null;
}
byte[] traceIdBytes = new byte[16];
byte[] spanIdBytes = new byte[8];
activity.TraceId.CopyTo(traceIdBytes);
activity.SpanId.CopyTo(spanIdBytes);
var parentSpanIdString = ByteString.Empty;
if (activity.ParentSpanId != default)
{
byte[] parentSpanIdBytes = new byte[8];
activity.ParentSpanId.CopyTo(parentSpanIdBytes);
parentSpanIdString = UnsafeByteOperations.UnsafeWrap(parentSpanIdBytes);
}
var startTimeUnixNano = activity.StartTimeUtc.ToUnixTimeNanoseconds();
var otlpSpan = new Span
{
Name = activity.DisplayName,
// There is an offset of 1 on the OTLP enum.
Kind = (Span.Types.SpanKind)(activity.Kind + 1),
TraceId = UnsafeByteOperations.UnsafeWrap(traceIdBytes),
SpanId = UnsafeByteOperations.UnsafeWrap(spanIdBytes),
ParentSpanId = parentSpanIdString,
TraceState = activity.TraceStateString ?? string.Empty,
StartTimeUnixNano = (ulong)startTimeUnixNano,
EndTimeUnixNano = (ulong)(startTimeUnixNano + activity.Duration.ToNanoseconds()),
};
TagEnumerationState otlpTags = new()
{
SdkLimitOptions = sdkLimitOptions,
Span = otlpSpan,
};
otlpTags.EnumerateTags(activity, sdkLimitOptions.SpanAttributeCountLimit ?? int.MaxValue);
if (activity.Kind == ActivityKind.Client || activity.Kind == ActivityKind.Producer)
{
PeerServiceResolver.Resolve(ref otlpTags, out string? peerServiceName, out bool addAsTag);
if (peerServiceName != null && addAsTag)
{
otlpSpan.Attributes.Add(
new KeyValue
{
Key = SemanticConventions.AttributePeerService,
Value = new AnyValue { StringValue = peerServiceName },
});
}
}
otlpSpan.Status = activity.ToOtlpStatus(ref otlpTags);
EventEnumerationState otlpEvents = new()
{
SdkLimitOptions = sdkLimitOptions,
Span = otlpSpan,
};
otlpEvents.EnumerateEvents(activity, sdkLimitOptions.SpanEventCountLimit ?? int.MaxValue);
LinkEnumerationState otlpLinks = new()
{
SdkLimitOptions = sdkLimitOptions,
Span = otlpSpan,
};
otlpLinks.EnumerateLinks(activity, sdkLimitOptions.SpanLinkCountLimit ?? int.MaxValue);
otlpSpan.Flags = ToOtlpSpanFlags(activity.Context.TraceFlags, activity.HasRemoteParent);
return otlpSpan;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static OtlpTrace.Status? ToOtlpStatus(this Activity activity, ref TagEnumerationState otlpTags)
{
var statusCodeForTagValue = StatusHelper.GetStatusCodeForTagValue(otlpTags.StatusCode);
if (activity.Status == ActivityStatusCode.Unset && statusCodeForTagValue == null)
{
return null;
}
OtlpTrace.Status.Types.StatusCode otlpActivityStatusCode = OtlpTrace.Status.Types.StatusCode.Unset;
string? otlpStatusDescription = null;
if (activity.Status != ActivityStatusCode.Unset)
{
// The numerical values of the two enumerations match, a simple cast is enough.
otlpActivityStatusCode = (OtlpTrace.Status.Types.StatusCode)(int)activity.Status;
if (activity.Status == ActivityStatusCode.Error && !string.IsNullOrEmpty(activity.StatusDescription))
{
otlpStatusDescription = activity.StatusDescription;
}
}
else
{
if (statusCodeForTagValue != StatusCode.Unset)
{
// The numerical values of the two enumerations match, a simple cast is enough.
otlpActivityStatusCode = (OtlpTrace.Status.Types.StatusCode)(int)statusCodeForTagValue!;
if (statusCodeForTagValue == StatusCode.Error && !string.IsNullOrEmpty(otlpTags.StatusDescription))
{
otlpStatusDescription = otlpTags.StatusDescription;
}
}
}
var otlpStatus = new OtlpTrace.Status { Code = otlpActivityStatusCode };
if (!string.IsNullOrEmpty(otlpStatusDescription))
{
otlpStatus.Message = otlpStatusDescription;
}
return otlpStatus;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Span.Types.Link ToOtlpLink(in ActivityLink activityLink, SdkLimitOptions sdkLimitOptions)
{
byte[] traceIdBytes = new byte[16];
byte[] spanIdBytes = new byte[8];
activityLink.Context.TraceId.CopyTo(traceIdBytes);
activityLink.Context.SpanId.CopyTo(spanIdBytes);
var otlpLink = new Span.Types.Link
{
TraceId = UnsafeByteOperations.UnsafeWrap(traceIdBytes),
SpanId = UnsafeByteOperations.UnsafeWrap(spanIdBytes),
};
int maxTags = sdkLimitOptions.SpanLinkAttributeCountLimit ?? int.MaxValue;
var otlpLinkAttributes = otlpLink.Attributes;
foreach (ref readonly var tag in activityLink.EnumerateTagObjects())
{
if (otlpLinkAttributes.Count == maxTags)
{
otlpLink.DroppedAttributesCount++;
continue;
}
OtlpTagWriter.Instance.TryWriteTag(ref otlpLinkAttributes, tag, sdkLimitOptions.AttributeValueLengthLimit);
}
otlpLink.Flags = ToOtlpSpanFlags(activityLink.Context.TraceFlags, activityLink.Context.IsRemote);
return otlpLink;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static Span.Types.Event ToOtlpEvent(in ActivityEvent activityEvent, SdkLimitOptions sdkLimitOptions)
{
var otlpEvent = new Span.Types.Event
{
Name = activityEvent.Name,
TimeUnixNano = (ulong)activityEvent.Timestamp.ToUnixTimeNanoseconds(),
};
int maxTags = sdkLimitOptions.SpanEventAttributeCountLimit ?? int.MaxValue;
var otlpEventAttributes = otlpEvent.Attributes;
foreach (ref readonly var tag in activityEvent.EnumerateTagObjects())
{
if (otlpEventAttributes.Count == maxTags)
{
otlpEvent.DroppedAttributesCount++;
continue;
}
OtlpTagWriter.Instance.TryWriteTag(ref otlpEventAttributes, tag, sdkLimitOptions.AttributeValueLengthLimit);
}
return otlpEvent;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private static uint ToOtlpSpanFlags(ActivityTraceFlags activityTraceFlags, bool isRemote)
{
SpanFlags flags = (SpanFlags)activityTraceFlags;
flags |= SpanFlags.ContextHasIsRemoteMask;
if (isRemote)
{
flags |= SpanFlags.ContextIsRemoteMask;
}
return (uint)flags;
}
private struct TagEnumerationState : PeerServiceResolver.IPeerServiceState
{
public SdkLimitOptions SdkLimitOptions;
public Span Span;
public string? StatusCode;
public string? StatusDescription;
public string? PeerService { get; set; }
public int? PeerServicePriority { get; set; }
public string? HostName { get; set; }
public string? IpAddress { get; set; }
public long Port { get; set; }
public void EnumerateTags(Activity activity, int maxTags)
{
var otlpSpanAttributes = this.Span.Attributes;
foreach (ref readonly var tag in activity.EnumerateTagObjects())
{
if (tag.Value == null)
{
continue;
}
var key = tag.Key;
switch (key)
{
case SpanAttributeConstants.StatusCodeKey:
this.StatusCode = tag.Value as string;
continue;
case SpanAttributeConstants.StatusDescriptionKey:
this.StatusDescription = tag.Value as string;
continue;
}
if (otlpSpanAttributes.Count == maxTags)
{
this.Span.DroppedAttributesCount++;
}
else
{
OtlpTagWriter.Instance.TryWriteTag(ref otlpSpanAttributes, tag, this.SdkLimitOptions.AttributeValueLengthLimit);
}
if (tag.Value is string tagStringValue)
{
PeerServiceResolver.InspectTag(ref this, key, tagStringValue);
}
else if (tag.Value is int tagIntValue)
{
PeerServiceResolver.InspectTag(ref this, key, tagIntValue);
}
}
}
}
private struct EventEnumerationState
{
public SdkLimitOptions SdkLimitOptions;
public Span Span;
public void EnumerateEvents(Activity activity, int maxEvents)
{
foreach (ref readonly var @event in activity.EnumerateEvents())
{
if (this.Span.Events.Count < maxEvents)
{
this.Span.Events.Add(ToOtlpEvent(in @event, this.SdkLimitOptions));
}
else
{
this.Span.DroppedEventsCount++;
}
}
}
}
private struct LinkEnumerationState
{
public SdkLimitOptions SdkLimitOptions;
public Span Span;
public void EnumerateLinks(Activity activity, int maxLinks)
{
foreach (ref readonly var link in activity.EnumerateLinks())
{
if (this.Span.Links.Count < maxLinks)
{
this.Span.Links.Add(ToOtlpLink(in link, this.SdkLimitOptions));
}
else
{
this.Span.DroppedLinksCount++;
}
}
}
}
}

View File

@ -1,45 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using Grpc.Core;
using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
/// <summary>Class for sending OTLP trace export request over gRPC.</summary>
internal sealed class OtlpGrpcTraceExportClient : BaseOtlpGrpcExportClient<OtlpCollector.ExportTraceServiceRequest>
{
private readonly OtlpCollector.TraceService.TraceServiceClient traceClient;
public OtlpGrpcTraceExportClient(OtlpExporterOptions options, OtlpCollector.TraceService.TraceServiceClient? traceServiceClient = null)
: base(options)
{
if (traceServiceClient != null)
{
this.traceClient = traceServiceClient;
}
else
{
this.Channel = options.CreateChannel();
this.traceClient = new OtlpCollector.TraceService.TraceServiceClient(this.Channel);
}
}
/// <inheritdoc/>
public override ExportClientResponse SendExportRequest(OtlpCollector.ExportTraceServiceRequest request, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
try
{
this.traceClient.Export(request, headers: this.Headers, deadline: deadlineUtc, cancellationToken: cancellationToken);
// We do not need to return back response and deadline for successful response so using cached value.
return SuccessExportResponse;
}
catch (RpcException ex)
{
OpenTelemetryProtocolExporterEventSource.Log.FailedToReachCollector(this.Endpoint, ex);
return new ExportClientGrpcResponse(false, deadlineUtc, ex, null, null);
}
}
}

View File

@ -1,69 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using System.Net;
#if NETFRAMEWORK
using System.Net.Http;
#endif
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
using Google.Protobuf;
using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
internal sealed class OtlpHttpTraceExportClient : BaseOtlpHttpExportClient<OtlpCollector.ExportTraceServiceRequest>
{
internal const string MediaContentType = "application/x-protobuf";
private const string TracesExportPath = "v1/traces";
public OtlpHttpTraceExportClient(OtlpExporterOptions options, HttpClient httpClient)
: base(options, httpClient, TracesExportPath)
{
}
protected override HttpContent CreateHttpContent(OtlpCollector.ExportTraceServiceRequest exportRequest)
{
return new ExportRequestContent(exportRequest);
}
internal sealed class ExportRequestContent : HttpContent
{
private static readonly MediaTypeHeaderValue ProtobufMediaTypeHeader = new(MediaContentType);
private readonly OtlpCollector.ExportTraceServiceRequest exportRequest;
public ExportRequestContent(OtlpCollector.ExportTraceServiceRequest exportRequest)
{
this.exportRequest = exportRequest;
this.Headers.ContentType = ProtobufMediaTypeHeader;
}
#if NET
protected override void SerializeToStream(Stream stream, TransportContext? context, CancellationToken cancellationToken)
{
this.SerializeToStreamInternal(stream);
}
#endif
protected override Task SerializeToStreamAsync(Stream stream, TransportContext? context)
{
this.SerializeToStreamInternal(stream);
return Task.CompletedTask;
}
protected override bool TryComputeLength(out long length)
{
// We can't know the length of the content being pushed to the output stream.
length = -1;
return false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void SerializeToStreamInternal(Stream stream)
{
this.exportRequest.WriteTo(stream);
}
}
}

View File

@ -11,7 +11,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
/// <summary>Class for sending OTLP trace export request over HTTP.</summary>
internal sealed class ProtobufOtlpHttpExportClient : ProtobufOtlpExportClient
{
private static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
internal static readonly MediaTypeHeaderValue MediaHeaderValue = new("application/x-protobuf");
private static readonly ExportClientHttpResponse SuccessExportResponse = new(success: true, deadlineUtc: default, response: null, exception: null);
internal ProtobufOtlpHttpExportClient(OtlpExporterOptions options, HttpClient httpClient, string signalPath)

View File

@ -16,7 +16,6 @@ using Google.Protobuf;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using LogOtlpCollector = OpenTelemetry.Proto.Collector.Logs.V1;
using MetricsOtlpCollector = OpenTelemetry.Proto.Collector.Metrics.V1;
using TraceOtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
namespace OpenTelemetry.Exporter;
@ -99,42 +98,6 @@ internal static class OtlpExporterOptionsExtensions
return headers;
}
public static OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions)
{
var exportClient = GetTraceExportClient(options);
// `HttpClient.Timeout.TotalMilliseconds` would be populated with the correct timeout value for both the exporter configuration cases:
// 1. User provides their own HttpClient. This case is straightforward as the user wants to use their `HttpClient` and thereby the same client's timeout value.
// 2. If the user configures timeout via the exporter options, then the timeout set for the `HttpClient` initialized by the exporter will be set to user provided value.
double timeoutMilliseconds = exportClient is OtlpHttpTraceExportClient httpTraceExportClient
? httpTraceExportClient.HttpClient.Timeout.TotalMilliseconds
: options.TimeoutMilliseconds;
if (experimentalOptions.EnableInMemoryRetry)
{
return new OtlpExporterRetryTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest>(exportClient, timeoutMilliseconds);
}
else if (experimentalOptions.EnableDiskRetry)
{
Debug.Assert(!string.IsNullOrEmpty(experimentalOptions.DiskRetryDirectoryPath), $"{nameof(experimentalOptions.DiskRetryDirectoryPath)} is null or empty");
return new OtlpExporterPersistentStorageTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest>(
exportClient,
timeoutMilliseconds,
(byte[] data) =>
{
var request = new TraceOtlpCollector.ExportTraceServiceRequest();
request.MergeFrom(data);
return request;
},
Path.Combine(experimentalOptions.DiskRetryDirectoryPath, "traces"));
}
else
{
return new OtlpExporterTransmissionHandler<TraceOtlpCollector.ExportTraceServiceRequest>(exportClient, timeoutMilliseconds);
}
}
public static ProtobufOtlpExporterTransmissionHandler GetProtobufExportTransmissionHandler(this OtlpExporterOptions options, ExperimentalOptions experimentalOptions, OtlpSignalType otlpSignalType)
{
var exportClient = GetProtobufExportClient(options, otlpSignalType);
@ -169,6 +132,11 @@ internal static class OtlpExporterOptionsExtensions
{
var httpClient = options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.");
if (options.Protocol != OtlpExportProtocol.Grpc && options.Protocol != OtlpExportProtocol.HttpProtobuf)
{
throw new NotSupportedException($"Protocol {options.Protocol} is not supported.");
}
return otlpSignalType switch
{
OtlpSignalType.Traces => options.Protocol == OtlpExportProtocol.Grpc
@ -255,16 +223,6 @@ internal static class OtlpExporterOptionsExtensions
}
}
public static IExportClient<TraceOtlpCollector.ExportTraceServiceRequest> GetTraceExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{
OtlpExportProtocol.Grpc => new OtlpGrpcTraceExportClient(options),
OtlpExportProtocol.HttpProtobuf => new OtlpHttpTraceExportClient(
options,
options.HttpClientFactory?.Invoke() ?? throw new InvalidOperationException("OtlpExporterOptions was missing HttpClientFactory or it returned null.")),
_ => throw new NotSupportedException($"Protocol {options.Protocol} is not supported."),
};
public static IExportClient<MetricsOtlpCollector.ExportMetricsServiceRequest> GetMetricsExportClient(this OtlpExporterOptions options) =>
options.Protocol switch
{

View File

@ -1,11 +1,12 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using System.Buffers.Binary;
using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OtlpCollector = OpenTelemetry.Proto.Collector.Trace.V1;
using OtlpResource = OpenTelemetry.Proto.Resource.V1;
using OpenTelemetry.Resources;
namespace OpenTelemetry.Exporter;
@ -16,9 +17,15 @@ namespace OpenTelemetry.Exporter;
public class OtlpTraceExporter : BaseExporter<Activity>
{
private readonly SdkLimitOptions sdkLimitOptions;
private readonly OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest> transmissionHandler;
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;
private OtlpResource.Resource? processResource;
private Resource? resource;
// Initial buffer size set to ~732KB.
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
// by the 7th doubling to maintain efficient allocation without frequent resizing.
private byte[] buffer = new byte[750000];
/// <summary>
/// Initializes a new instance of the <see cref="OtlpTraceExporter"/> class.
@ -40,17 +47,17 @@ public class OtlpTraceExporter : BaseExporter<Activity>
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest>? transmissionHandler = null)
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
this.sdkLimitOptions = sdkLimitOptions!;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetTraceExportTransmissionHandler(experimentalOptions);
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}
internal OtlpResource.Resource ProcessResource => this.processResource ??= this.ParentProvider.GetResource().ToOtlpResource();
internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
/// <inheritdoc/>
public override ExportResult Export(in Batch<Activity> activityBatch)
@ -58,13 +65,22 @@ public class OtlpTraceExporter : BaseExporter<Activity>
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();
var request = new OtlpCollector.ExportTraceServiceRequest();
try
{
request.AddBatch(this.sdkLimitOptions, this.ProcessResource, activityBatch);
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);
if (!this.transmissionHandler.TrySubmitRequest(request))
if (this.startWritePosition == 5)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}
if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
{
return ExportResult.Failure;
}
@ -74,17 +90,10 @@ public class OtlpTraceExporter : BaseExporter<Activity>
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}
finally
{
request.Return();
}
return ExportResult.Success;
}
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
protected override bool OnShutdown(int timeoutMilliseconds) => this.transmissionHandler.Shutdown(timeoutMilliseconds);
}

View File

@ -136,16 +136,7 @@ public static class OtlpTraceExporterHelperExtensions
exporterOptions!.TryEnableIHttpClientFactoryIntegration(serviceProvider!, "OtlpTraceExporter");
BaseExporter<Activity> otlpExporter;
if (experimentalOptions != null && experimentalOptions.UseCustomProtobufSerializer)
{
otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
}
else
{
otlpExporter = new OtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
}
BaseExporter<Activity> otlpExporter = new OtlpTraceExporter(exporterOptions!, sdkLimitOptions!, experimentalOptions!);
if (configureExporterInstance != null)
{

View File

@ -1,102 +0,0 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using System.Buffers.Binary;
using System.Diagnostics;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetry.Resources;
namespace OpenTelemetry.Exporter;
/// <summary>
/// Exporter consuming <see cref="Activity"/> and exporting the data using
/// the OpenTelemetry protocol (OTLP).
/// </summary>
internal sealed class ProtobufOtlpTraceExporter : BaseExporter<Activity>
{
private readonly SdkLimitOptions sdkLimitOptions;
private readonly ProtobufOtlpExporterTransmissionHandler transmissionHandler;
private readonly int startWritePosition;
private Resource? resource;
// Initial buffer size set to ~732KB.
// This choice allows us to gradually grow the buffer while targeting a final capacity of around 100 MB,
// by the 7th doubling to maintain efficient allocation without frequent resizing.
private byte[] buffer = new byte[750000];
/// <summary>
/// Initializes a new instance of the <see cref="ProtobufOtlpTraceExporter"/> class.
/// </summary>
/// <param name="options">Configuration options for the export.</param>
public ProtobufOtlpTraceExporter(OtlpExporterOptions options)
: this(options, sdkLimitOptions: new(), experimentalOptions: new(), transmissionHandler: null)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="ProtobufOtlpTraceExporter"/> class.
/// </summary>
/// <param name="exporterOptions"><see cref="OtlpExporterOptions"/>.</param>
/// <param name="sdkLimitOptions"><see cref="SdkLimitOptions"/>.</param>
/// <param name="experimentalOptions"><see cref="ExperimentalOptions"/>.</param>
/// <param name="transmissionHandler"><see cref="OtlpExporterTransmissionHandler{T}"/>.</param>
internal ProtobufOtlpTraceExporter(
OtlpExporterOptions exporterOptions,
SdkLimitOptions sdkLimitOptions,
ExperimentalOptions experimentalOptions,
ProtobufOtlpExporterTransmissionHandler? transmissionHandler = null)
{
Debug.Assert(exporterOptions != null, "exporterOptions was null");
Debug.Assert(sdkLimitOptions != null, "sdkLimitOptions was null");
this.sdkLimitOptions = sdkLimitOptions!;
this.startWritePosition = exporterOptions!.Protocol == OtlpExportProtocol.Grpc ? 5 : 0;
this.transmissionHandler = transmissionHandler ?? exporterOptions!.GetProtobufExportTransmissionHandler(experimentalOptions, OtlpSignalType.Traces);
}
internal Resource Resource => this.resource ??= this.ParentProvider.GetResource();
/// <inheritdoc/>
public override ExportResult Export(in Batch<Activity> activityBatch)
{
// Prevents the exporter's gRPC and HTTP operations from being instrumented.
using var scope = SuppressInstrumentationScope.Begin();
try
{
int writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(this.buffer, this.startWritePosition, this.sdkLimitOptions, this.Resource, activityBatch);
if (this.startWritePosition == 5)
{
// Grpc payload consists of 3 parts
// byte 0 - Specifying if the payload is compressed.
// 1-4 byte - Specifies the length of payload in big endian format.
// 5 and above - Protobuf serialized data.
Span<byte> data = new Span<byte>(this.buffer, 1, 4);
var dataLength = writePosition - 5;
BinaryPrimitives.WriteUInt32BigEndian(data, (uint)dataLength);
}
if (!this.transmissionHandler.TrySubmitRequest(this.buffer, writePosition))
{
return ExportResult.Failure;
}
}
catch (Exception ex)
{
OpenTelemetryProtocolExporterEventSource.Log.ExportMethodException(ex);
return ExportResult.Failure;
}
return ExportResult.Success;
}
/// <inheritdoc />
protected override bool OnShutdown(int timeoutMilliseconds)
{
return this.transmissionHandler.Shutdown(timeoutMilliseconds);
}
}

View File

@ -12,7 +12,6 @@ using OpenTelemetryProtocol::OpenTelemetry.Exporter;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetryProtocol::OpenTelemetry.Proto.Collector.Trace.V1;
namespace Benchmarks.Exporter;
@ -36,7 +35,7 @@ public class OtlpGrpcExporterBenchmarks
options,
new SdkLimitOptions(),
new ExperimentalOptions(),
new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(new OtlpGrpcTraceExportClient(options, new TestTraceServiceClient()), options.TimeoutMilliseconds));
new ProtobufOtlpExporterTransmissionHandler(new ProtobufOtlpGrpcExportClient(options, options.HttpClientFactory(), "opentelemetry.proto.collector.trace.v1.TraceService/Export"), options.TimeoutMilliseconds));
this.activity = ActivityHelper.CreateTestActivity();
this.activityBatch = new CircularBuffer<Activity>(this.NumberOfSpans);

View File

@ -13,7 +13,6 @@ using OpenTelemetryProtocol::OpenTelemetry.Exporter;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetryProtocol::OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Transmission;
using OpenTelemetryProtocol::OpenTelemetry.Proto.Collector.Trace.V1;
namespace Benchmarks.Exporter;
@ -64,7 +63,7 @@ public class OtlpHttpExporterBenchmarks
options,
new SdkLimitOptions(),
new ExperimentalOptions(),
new OtlpExporterTransmissionHandler<ExportTraceServiceRequest>(new OtlpHttpTraceExportClient(options, options.HttpClientFactory()), options.TimeoutMilliseconds));
new ProtobufOtlpExporterTransmissionHandler(new ProtobufOtlpHttpExportClient(options, options.HttpClientFactory(), "v1/traces"), options.TimeoutMilliseconds));
this.activity = ActivityHelper.CreateTestActivity();
this.activityBatch = new CircularBuffer<Activity>(this.NumberOfSpans);

View File

@ -7,6 +7,7 @@ using System.Net.Http;
#endif
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.Serializer;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using Xunit;
@ -43,7 +44,7 @@ public class OtlpHttpTraceExportClientTests
Headers = $"{header1.Name}={header1.Value}, {header2.Name} = {header2.Value}",
};
var client = new OtlpHttpTraceExportClient(options, options.HttpClientFactory());
var client = new ProtobufOtlpHttpExportClient(options, options.HttpClientFactory(), "/v1/traces");
Assert.NotNull(client.HttpClient);
@ -85,7 +86,7 @@ public class OtlpHttpTraceExportClientTests
var httpClient = new HttpClient(testHttpHandler);
var exportClient = new OtlpHttpTraceExportClient(options, httpClient);
var exportClient = new ProtobufOtlpHttpExportClient(options, httpClient, string.Empty);
var resourceBuilder = ResourceBuilder.CreateEmpty();
if (includeServiceNameInResource)
@ -131,10 +132,10 @@ public class OtlpHttpTraceExportClientTests
var deadlineUtc = DateTime.UtcNow.AddMilliseconds(httpClient.Timeout.TotalMilliseconds);
var request = new OtlpCollector.ExportTraceServiceRequest();
request.AddBatch(DefaultSdkLimitOptions, resourceBuilder.Build().ToOtlpResource(), batch);
var (buffer, contentLength) = CreateTraceExportRequest(DefaultSdkLimitOptions, batch, resourceBuilder.Build());
// Act
var result = exportClient.SendExportRequest(request, deadlineUtc);
var result = exportClient.SendExportRequest(buffer, contentLength, deadlineUtc);
var httpRequest = testHttpHandler.HttpRequestMessage;
@ -154,8 +155,11 @@ public class OtlpHttpTraceExportClientTests
}
Assert.NotNull(testHttpHandler.HttpRequestContent);
Assert.IsType<OtlpHttpTraceExportClient.ExportRequestContent>(httpRequest.Content);
Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == OtlpHttpTraceExportClient.MediaContentType);
// TODO: Revisit once the HttpClient part is overridden.
// Assert.IsType<ProtobufOtlpHttpExportClient.ExportRequestContent>(httpRequest.Content);
Assert.NotNull(httpRequest.Content);
Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == ProtobufOtlpHttpExportClient.MediaHeaderValue.ToString());
var exportTraceRequest = OtlpCollector.ExportTraceServiceRequest.Parser.ParseFrom(testHttpHandler.HttpRequestContent);
Assert.NotNull(exportTraceRequest);
@ -173,4 +177,11 @@ public class OtlpHttpTraceExportClientTests
}
}
}
private static (byte[] Buffer, int ContentLength) CreateTraceExportRequest(SdkLimitOptions sdkOptions, in Batch<Activity> batch, Resource resource)
{
var buffer = new byte[4096];
var writePosition = ProtobufOtlpTraceSerializer.WriteTraceData(buffer, 0, sdkOptions, resource, batch);
return (buffer, writePosition);
}
}

View File

@ -74,7 +74,7 @@ public sealed class MockCollectorIntegrationTests
await httpClient.GetAsync($"/MockCollector/SetResponseCodes/{string.Join(",", codes.Select(x => (int)x))}");
var exportResults = new List<ExportResult>();
using var otlpExporter = new ProtobufOtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") });
using var otlpExporter = new OtlpTraceExporter(new OtlpExporterOptions() { Endpoint = new Uri($"http://localhost:{testGrpcPort}") });
var delegatingExporter = new DelegatingExporter<Activity>
{
OnExportFunc = (batch) =>
@ -186,7 +186,7 @@ public sealed class MockCollectorIntegrationTests
})
.Build();
using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration));
using var otlpExporter = new OtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration));
var activitySourceName = "otel.grpc.retry.test";
using var source = new ActivitySource(activitySourceName);
@ -275,7 +275,7 @@ public sealed class MockCollectorIntegrationTests
})
.Build();
using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration));
using var otlpExporter = new OtlpTraceExporter(exporterOptions, new SdkLimitOptions(), new ExperimentalOptions(configuration));
var activitySourceName = "otel.http.retry.test";
using var source = new ActivitySource(activitySourceName);
@ -379,7 +379,7 @@ public sealed class MockCollectorIntegrationTests
})
.Build();
using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler);
using var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler);
var activitySourceName = "otel.http.persistent.storage.retry.test";
using var source = new ActivitySource(activitySourceName);
@ -519,7 +519,7 @@ public sealed class MockCollectorIntegrationTests
})
.Build();
using var otlpExporter = new ProtobufOtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler);
using var otlpExporter = new OtlpTraceExporter(exporterOptions, new(), new(configuration), transmissionHandler);
var activitySourceName = "otel.grpc.persistent.storage.retry.test";
using var source = new ActivitySource(activitySourceName);

View File

@ -91,8 +91,8 @@ public class OtlpExporterOptionsExtensionsTests
}
[Theory]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient))]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient))]
[InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient))]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient))]
public void GetTraceExportClient_SupportedProtocol_ReturnsCorrectExportClient(OtlpExportProtocol protocol, Type expectedExportClientType)
{
var options = new OtlpExporterOptions
@ -100,7 +100,7 @@ public class OtlpExporterOptionsExtensionsTests
Protocol = protocol,
};
var exportClient = options.GetTraceExportClient();
var exportClient = options.GetProtobufExportClient(OtlpSignalType.Traces);
Assert.Equal(expectedExportClientType, exportClient.GetType());
}
@ -113,7 +113,7 @@ public class OtlpExporterOptionsExtensionsTests
Protocol = (OtlpExportProtocol)123,
};
Assert.Throws<NotSupportedException>(() => options.GetTraceExportClient());
Assert.Throws<NotSupportedException>(() => options.GetProtobufExportClient(OtlpSignalType.Traces));
}
[Theory]
@ -131,27 +131,27 @@ public class OtlpExporterOptionsExtensionsTests
}
[Theory]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), true, 8000, null)]
[InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), true, 8000, null)]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcMetricsExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), true, 8000, null)]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcLogExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), false, 10000, null)]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), true, 8000, null)]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), true, 8000, "in_memory")]
[InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), true, 8000, "in_memory")]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcMetricsExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), true, 8000, "in_memory")]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcLogExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), false, 10000, "in_memory")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpLogExportClient), true, 8000, "in_memory")]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcTraceExportClient), false, 10000, "disk")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), false, 10000, "disk")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpTraceExportClient), true, 8000, "disk")]
[InlineData(OtlpExportProtocol.Grpc, typeof(ProtobufOtlpGrpcExportClient), false, 10000, "disk")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), false, 10000, "disk")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(ProtobufOtlpHttpExportClient), true, 8000, "disk")]
[InlineData(OtlpExportProtocol.Grpc, typeof(OtlpGrpcMetricsExportClient), false, 10000, "disk")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), false, 10000, "disk")]
[InlineData(OtlpExportProtocol.HttpProtobuf, typeof(OtlpHttpMetricsExportClient), true, 8000, "disk")]
@ -173,9 +173,9 @@ public class OtlpExporterOptionsExtensionsTests
.AddInMemoryCollection(new Dictionary<string, string?> { [ExperimentalOptions.OtlpRetryEnvVar] = retryStrategy })
.Build();
if (exportClientType == typeof(OtlpGrpcTraceExportClient) || exportClientType == typeof(OtlpHttpTraceExportClient))
if (exportClientType == typeof(ProtobufOtlpGrpcExportClient) || exportClientType == typeof(ProtobufOtlpHttpExportClient))
{
var transmissionHandler = exporterOptions.GetTraceExportTransmissionHandler(new ExperimentalOptions(configuration));
var transmissionHandler = exporterOptions.GetProtobufExportTransmissionHandler(new ExperimentalOptions(configuration), OtlpSignalType.Traces);
AssertTransmissionHandler(transmissionHandler, exportClientType, expectedTimeoutMilliseconds, retryStrategy);
}
@ -212,4 +212,24 @@ public class OtlpExporterOptionsExtensionsTests
Assert.Equal(expectedTimeoutMilliseconds, transmissionHandler.TimeoutMilliseconds);
}
private static void AssertTransmissionHandler(ProtobufOtlpExporterTransmissionHandler transmissionHandler, Type exportClientType, int expectedTimeoutMilliseconds, string? retryStrategy)
{
if (retryStrategy == "in_memory")
{
Assert.True(transmissionHandler is ProtobufOtlpExporterRetryTransmissionHandler);
}
else if (retryStrategy == "disk")
{
Assert.True(transmissionHandler is ProtobufOtlpExporterPersistentStorageTransmissionHandler);
}
else
{
Assert.True(transmissionHandler is ProtobufOtlpExporterTransmissionHandler);
}
Assert.Equal(exportClientType, transmissionHandler.ExportClient.GetType());
Assert.Equal(expectedTimeoutMilliseconds, transmissionHandler.TimeoutMilliseconds);
}
}

View File

@ -128,11 +128,9 @@ public class OtlpTraceExporterTests
}
[Theory]
[InlineData(true, true)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(false, false)]
public void ToOtlpResourceSpansTest(bool includeServiceNameInResource, bool useCustomSerializer)
[InlineData(true)]
[InlineData(false)]
public void ToOtlpResourceSpansTest(bool includeServiceNameInResource)
{
var evenTags = new[] { new KeyValuePair<string, object?>("k0", "v0") };
var oddTags = new[] { new KeyValuePair<string, object?>("k1", "v1") };
@ -175,16 +173,7 @@ public class OtlpTraceExporterTests
void RunTest(SdkLimitOptions sdkOptions, Batch<Activity> batch)
{
var request = new OtlpCollector.ExportTraceServiceRequest();
if (useCustomSerializer)
{
request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build());
}
else
{
request.AddBatch(sdkOptions, resourceBuilder.Build().ToOtlpResource(), batch);
}
var request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build());
Assert.Single(request.ResourceSpans);
var otlpResource = request.ResourceSpans.First().Resource;
@ -231,10 +220,8 @@ public class OtlpTraceExporterTests
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ScopeAttributesRemainConsistentAcrossMultipleBatches(bool useCustomSerializer)
[Fact]
public void ScopeAttributesRemainConsistentAcrossMultipleBatches()
{
var activitySourceTags = new TagList
{
@ -275,16 +262,7 @@ public class OtlpTraceExporterTests
void RunTest(SdkLimitOptions sdkOptions, Batch<Activity> batch, ActivitySource activitySource)
{
var request = new OtlpCollector.ExportTraceServiceRequest();
if (useCustomSerializer)
{
request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build());
}
else
{
request.AddBatch(sdkOptions, resourceBuilder.Build().ToOtlpResource(), batch);
}
var request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build());
var resourceSpans = request.ResourceSpans.First();
Assert.NotNull(request.ResourceSpans.First());
@ -305,8 +283,7 @@ public class OtlpTraceExporterTests
}
// Return and re-add batch to simulate reuse
request.Return();
request.AddBatch(DefaultSdkLimitOptions, ResourceBuilder.CreateDefault().Build().ToOtlpResource(), batch);
request = CreateTraceExportRequest(DefaultSdkLimitOptions, batch, ResourceBuilder.CreateDefault().Build());
resourceSpans = request.ResourceSpans.First();
scopeSpans = resourceSpans.ScopeSpans.First();
@ -320,16 +297,11 @@ public class OtlpTraceExporterTests
{
Assert.Contains(scope.Attributes, (kvp) => kvp.Key == tag.Key && kvp.Value.StringValue == (string?)tag.Value);
}
// Return and re-add batch to simulate reuse
request.Return();
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ScopeAttributesLimitsTest(bool useCustomSerializer)
[Fact]
public void ScopeAttributesLimitsTest()
{
var sdkOptions = new SdkLimitOptions()
{
@ -367,16 +339,7 @@ public class OtlpTraceExporterTests
void RunTest(SdkLimitOptions sdkOptions, Batch<Activity> batch)
{
var request = new OtlpCollector.ExportTraceServiceRequest();
if (useCustomSerializer)
{
request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build());
}
else
{
request.AddBatch(sdkOptions, resourceBuilder.Build().ToOtlpResource(), batch);
}
var request = CreateTraceExportRequest(sdkOptions, batch, resourceBuilder.Build());
var resourceSpans = request.ResourceSpans.First();
Assert.NotNull(request.ResourceSpans.First());
@ -392,19 +355,11 @@ public class OtlpTraceExporterTests
Assert.Equal("1234", scope.Attributes[0].Value.StringValue);
this.ArrayValueAsserts(scope.Attributes[1].Value.ArrayValue.Values);
Assert.Equal(new object().ToString()!.Substring(0, 4), scope.Attributes[2].Value.StringValue);
// Return and re-add batch to simulate reuse
if (!useCustomSerializer)
{
request.Return();
}
}
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void SpanLimitsTest(bool useCustomSerializer)
[Fact]
public void SpanLimitsTest()
{
var sdkOptions = new SdkLimitOptions()
{
@ -439,7 +394,7 @@ public class OtlpTraceExporterTests
activity.AddEvent(event1);
activity.AddEvent(event2);
var otlpSpan = useCustomSerializer ? ToOtlpSpan(sdkOptions, activity) : activity.ToOtlpSpan(sdkOptions);
var otlpSpan = ToOtlpSpan(sdkOptions, activity);
Assert.NotNull(otlpSpan);
Assert.Equal(3, otlpSpan.Attributes.Count);
@ -465,10 +420,8 @@ public class OtlpTraceExporterTests
Assert.Equal(new object().ToString()!.Substring(0, 4), otlpSpan.Links[0].Attributes[2].Value.StringValue);
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToOtlpSpanTest(bool useCustomSerializer)
[Fact]
public void ToOtlpSpanTest()
{
using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest));
@ -510,7 +463,7 @@ public class OtlpTraceExporterTests
rootActivity.TraceId.CopyTo(traceIdSpan);
var traceId = traceIdSpan.ToArray();
var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity);
Assert.NotNull(otlpSpan);
Assert.Equal("root", otlpSpan.Name);
@ -546,7 +499,7 @@ public class OtlpTraceExporterTests
rootActivity.Context.SpanId.CopyTo(parentIdSpan);
var parentId = parentIdSpan.ToArray();
otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, childActivity) : childActivity.ToOtlpSpan(DefaultSdkLimitOptions);
otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, childActivity);
Assert.NotNull(otlpSpan);
Assert.Equal("child", otlpSpan.Name);
@ -581,10 +534,8 @@ public class OtlpTraceExporterTests
Assert.False(flags.HasFlag(OtlpTrace.SpanFlags.ContextIsRemoteMask));
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public void ToOtlpSpanActivitiesWithNullArrayTest(bool useCustomSerializer)
[Fact]
public void ToOtlpSpanActivitiesWithNullArrayTest()
{
using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest));
@ -594,7 +545,7 @@ public class OtlpTraceExporterTests
var stringArr = new string?[] { "test", string.Empty, null };
rootActivity.SetTag("stringArray", stringArr);
var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity);
Assert.NotNull(otlpSpan);
@ -607,20 +558,17 @@ public class OtlpTraceExporterTests
}
[Theory]
[InlineData(ActivityStatusCode.Unset, "Description will be ignored if status is Unset.", true)]
[InlineData(ActivityStatusCode.Ok, "Description will be ignored if status is Okay.", true)]
[InlineData(ActivityStatusCode.Error, "Description will be kept if status is Error.", true)]
[InlineData(ActivityStatusCode.Unset, "Description will be ignored if status is Unset.", false)]
[InlineData(ActivityStatusCode.Ok, "Description will be ignored if status is Okay.", false)]
[InlineData(ActivityStatusCode.Error, "Description will be kept if status is Error.", false)]
public void ToOtlpSpanNativeActivityStatusTest(ActivityStatusCode expectedStatusCode, string statusDescription, bool useCustomSerializer)
[InlineData(ActivityStatusCode.Unset, "Description will be ignored if status is Unset.")]
[InlineData(ActivityStatusCode.Ok, "Description will be ignored if status is Okay.")]
[InlineData(ActivityStatusCode.Error, "Description will be kept if status is Error.")]
public void ToOtlpSpanNativeActivityStatusTest(ActivityStatusCode expectedStatusCode, string statusDescription)
{
using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest));
using var activity = activitySource.StartActivity("Name");
Assert.NotNull(activity);
activity.SetStatus(expectedStatusCode, statusDescription);
var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, activity) : activity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity);
Assert.NotNull(otlpSpan);
if (expectedStatusCode == ActivityStatusCode.Unset)
{
@ -655,7 +603,7 @@ public class OtlpTraceExporterTests
activity.SetTag(SpanAttributeConstants.StatusCodeKey, statusCodeTagValue);
activity.SetTag(SpanAttributeConstants.StatusDescriptionKey, statusDescription);
var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity);
Assert.NotNull(otlpSpan);
Assert.NotNull(otlpSpan.Status);
@ -683,7 +631,7 @@ public class OtlpTraceExporterTests
Assert.NotNull(activity);
activity.SetTag(SpanAttributeConstants.StatusCodeKey, statusCodeTagValue);
var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity);
Assert.NotNull(otlpSpan);
Assert.NotNull(otlpSpan.Status);
@ -702,7 +650,7 @@ public class OtlpTraceExporterTests
activity.SetTag(SpanAttributeConstants.StatusCodeKey, "ERROR");
activity.SetTag(SpanAttributeConstants.StatusDescriptionKey, tagDescriptionOnError);
var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity);
Assert.NotNull(otlpSpan);
Assert.NotNull(otlpSpan.Status);
@ -721,7 +669,7 @@ public class OtlpTraceExporterTests
activity.SetStatus(ActivityStatusCode.Error, statusDescriptionOnError);
activity.SetTag(SpanAttributeConstants.StatusCodeKey, "OK");
var otlpSpan = activity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity);
Assert.NotNull(otlpSpan);
Assert.NotNull(otlpSpan.Status);
@ -730,11 +678,9 @@ public class OtlpTraceExporterTests
}
[Theory]
[InlineData(true, true)]
[InlineData(false, true)]
[InlineData(true, false)]
[InlineData(false, false)]
public void ToOtlpSpanTraceStateTest(bool traceStateWasSet, bool useCustomSerializer)
[InlineData(true)]
[InlineData(false)]
public void ToOtlpSpanTraceStateTest(bool traceStateWasSet)
{
using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest));
using var activity = activitySource.StartActivity("Name");
@ -745,7 +691,7 @@ public class OtlpTraceExporterTests
activity.TraceStateString = tracestate;
}
var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, activity) : activity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, activity);
Assert.NotNull(otlpSpan);
if (traceStateWasSet)
@ -759,26 +705,6 @@ public class OtlpTraceExporterTests
}
}
[Fact]
public void ToOtlpSpanPeerServiceTest()
{
using var activitySource = new ActivitySource(nameof(this.ToOtlpSpanTest));
using var rootActivity = activitySource.StartActivity("root", ActivityKind.Client);
Assert.NotNull(rootActivity);
rootActivity.SetTag(SemanticConventions.AttributeHttpHost, "opentelemetry.io");
var otlpSpan = rootActivity.ToOtlpSpan(DefaultSdkLimitOptions);
Assert.NotNull(otlpSpan);
var peerService = otlpSpan.Attributes.FirstOrDefault(kvp => kvp.Key == SemanticConventions.AttributePeerService);
Assert.NotNull(peerService);
Assert.Equal("opentelemetry.io", peerService.Value.StringValue);
}
[Fact]
public void UseOpenTelemetryProtocolActivityExporterWithCustomActivityProcessor()
{
@ -817,10 +743,10 @@ public class OtlpTraceExporterTests
[Fact]
public void Shutdown_ClientShutdownIsCalled()
{
var exportClientMock = new TestExportClient<OtlpCollector.ExportTraceServiceRequest>();
var exportClientMock = new TestProtobufExportClient();
var exporterOptions = new OtlpExporterOptions();
var transmissionHandler = new OtlpExporterTransmissionHandler<OtlpCollector.ExportTraceServiceRequest>(exportClientMock, exporterOptions.TimeoutMilliseconds);
var transmissionHandler = new ProtobufOtlpExporterTransmissionHandler(exportClientMock, exporterOptions.TimeoutMilliseconds);
using var exporter = new OtlpTraceExporter(new OtlpExporterOptions(), DefaultSdkLimitOptions, DefaultExperimentalOptions, transmissionHandler);
exporter.Shutdown();
@ -934,15 +860,11 @@ public class OtlpTraceExporterTests
}
[Theory]
[InlineData(true, true, true)]
[InlineData(true, false, true)]
[InlineData(false, true, true)]
[InlineData(false, false, true)]
[InlineData(true, true, false)]
[InlineData(true, false, false)]
[InlineData(false, true, false)]
[InlineData(false, false, false)]
public void SpanFlagsTest(bool isRecorded, bool isRemote, bool useCustomSerializer)
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public void SpanFlagsTest(bool isRecorded, bool isRemote)
{
using var activitySource = new ActivitySource(nameof(this.SpanFlagsTest));
@ -955,7 +877,7 @@ public class OtlpTraceExporterTests
using var rootActivity = activitySource.StartActivity("root", ActivityKind.Server, ctx);
Assert.NotNull(rootActivity);
var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity);
Assert.NotNull(otlpSpan);
var flags = (OtlpTrace.SpanFlags)otlpSpan.Flags;
@ -984,15 +906,11 @@ public class OtlpTraceExporterTests
}
[Theory]
[InlineData(true, true, true)]
[InlineData(true, false, true)]
[InlineData(false, true, true)]
[InlineData(false, false, true)]
[InlineData(true, true, false)]
[InlineData(true, false, false)]
[InlineData(false, true, false)]
[InlineData(false, false, false)]
public void SpanLinkFlagsTest(bool isRecorded, bool isRemote, bool useCustomSerializer)
[InlineData(true, true)]
[InlineData(true, false)]
[InlineData(false, true)]
[InlineData(false, false)]
public void SpanLinkFlagsTest(bool isRecorded, bool isRemote)
{
using var activitySource = new ActivitySource(nameof(this.SpanLinkFlagsTest));
@ -1010,7 +928,7 @@ public class OtlpTraceExporterTests
using var rootActivity = activitySource.StartActivity("root", ActivityKind.Server, default(ActivityContext), links: links);
Assert.NotNull(rootActivity);
var otlpSpan = useCustomSerializer ? ToOtlpSpan(DefaultSdkLimitOptions, rootActivity) : rootActivity.ToOtlpSpan(DefaultSdkLimitOptions);
var otlpSpan = ToOtlpSpan(DefaultSdkLimitOptions, rootActivity);
Assert.NotNull(otlpSpan);
var spanLink = Assert.Single(otlpSpan.Links);

View File

@ -0,0 +1,40 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests;
internal class TestProtobufExportClient(bool throwException = false) : IProtobufExportClient
{
public bool SendExportRequestCalled { get; private set; }
public bool ShutdownCalled { get; private set; }
public bool ThrowException { get; set; } = throwException;
public ExportClientResponse SendExportRequest(byte[] buffer, int contentLength, DateTime deadlineUtc, CancellationToken cancellationToken = default)
{
if (this.ThrowException)
{
throw new Exception("Exception thrown from SendExportRequest");
}
this.SendExportRequestCalled = true;
return new TestExportClientResponse(true, deadlineUtc, null);
}
public bool Shutdown(int timeoutMilliseconds)
{
this.ShutdownCalled = true;
return true;
}
private class TestExportClientResponse : ExportClientResponse
{
public TestExportClientResponse(bool success, DateTime deadline, Exception? exception)
: base(success, deadline, exception)
{
}
}
}