ZipkinExporter performance improvements. (#596)

* ZipkinExporter performance improvements.

* Bug fixes.

* Added a Zipkin integration test.

* Bug fixes.

* Fixed HttpClientCollectorOptions not filtering HttpWebRequest events.

* Removed LangVersion from project file, it wasn't needed.

* Fixed unit tests failing when IPv6 isn't available on host.

Co-authored-by: Sergey Kanzhelev <S.Kanzhelev@live.com>
This commit is contained in:
Mikel Blanchard 2020-04-14 14:09:18 -07:00 committed by GitHub
parent a1ce4ebec9
commit b35311e75d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 535 additions and 334 deletions

View File

@ -14,7 +14,9 @@
// limitations under the License.
// </copyright>
using System;
using System.Net;
using System.Net.Http;
using OpenTelemetry.Collector.Dependencies.Implementation;
using OpenTelemetry.Context.Propagation;
namespace OpenTelemetry.Collector.Dependencies
@ -63,12 +65,9 @@ namespace OpenTelemetry.Collector.Dependencies
// TODO: there is some preliminary consensus that we should introduce 'terminal' spans or context.
// exporters should ensure they set it
if (activityName == "System.Net.Http.HttpRequestOut" &&
arg1 is HttpRequestMessage request &&
request.RequestUri != null &&
request.Method == HttpMethod.Post)
if (IsHttpOutgoingPostRequest(activityName, arg1, out Uri requestUri))
{
var originalString = request.RequestUri.OriginalString;
var originalString = requestUri.OriginalString;
// zipkin
if (originalString.Contains(":9411/api/v2/spans"))
@ -89,5 +88,34 @@ namespace OpenTelemetry.Collector.Dependencies
return true;
}
private static bool IsHttpOutgoingPostRequest(string activityName, object arg1, out Uri requestUri)
{
if (activityName == "System.Net.Http.HttpRequestOut")
{
if (arg1 is HttpRequestMessage request &&
request.RequestUri != null &&
request.Method == HttpMethod.Post)
{
requestUri = request.RequestUri;
return true;
}
}
#if NET461
else if (activityName == HttpWebRequestDiagnosticSource.ActivityName)
{
if (arg1 is HttpWebRequest request &&
request.RequestUri != null &&
request.Method == "POST")
{
requestUri = request.RequestUri;
return true;
}
}
#endif
requestUri = null;
return false;
}
}
}

View File

@ -16,6 +16,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Export;

View File

@ -13,10 +13,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
using Thrift.Protocol;
using Thrift.Protocol.Entities;

View File

@ -16,6 +16,7 @@
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Internal;
using Thrift.Protocol;
using Thrift.Protocol.Entities;

View File

@ -13,9 +13,13 @@
<NoWarn>$(NoWarn),1591</NoWarn>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\OpenTelemetry\Internal\EnumerationHelper.cs" Link="Implementation\EnumerationHelper.cs" />
<Compile Include="..\OpenTelemetry\Internal\PooledList.cs" Link="Implementation\PooledList.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="ApacheThrift" Version="0.13.0.1" IncludeAssets="all" ExcludeAssets="compile;runtime" GeneratePathProperty="true" />
<PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.7.0" />
<PackageReference Include="System.Threading.Tasks.Extensions" Version="4.5.3" Condition="'$(TargetFramework)' != 'netstandard2.1'" />
</ItemGroup>

View File

@ -15,10 +15,18 @@
// </copyright>
namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal class ZipkinAnnotation
internal readonly struct ZipkinAnnotation
{
public long Timestamp { get; set; }
public ZipkinAnnotation(
long timestamp,
string value)
{
this.Timestamp = timestamp;
this.Value = value;
}
public string Value { get; set; }
public long Timestamp { get; }
public string Value { get; }
}
}

View File

@ -17,6 +17,7 @@ using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
using OpenTelemetry.Trace.Export;
@ -40,6 +41,11 @@ namespace OpenTelemetry.Exporter.Zipkin.Implementation
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> LocalEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> RemoteEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly ConcurrentDictionary<CanonicalCode, string> CanonicalCodeCache = new ConcurrentDictionary<CanonicalCode, string>();
private static readonly DictionaryEnumerator<string, object, AttributeEnumerationState>.ForEachDelegate ProcessAttributesRef = ProcessAttributes;
private static readonly DictionaryEnumerator<string, object, AttributeEnumerationState>.ForEachDelegate ProcessLibraryResourcesRef = ProcessLibraryResources;
private static readonly ListEnumerator<Event, PooledList<ZipkinAnnotation>>.ForEachDelegate ProcessEventsRef = ProcessEvents;
internal static ZipkinSpan ToZipkinSpan(this SpanData otelSpan, ZipkinEndpoint defaultLocalEndpoint, bool useShortTraceIds = false)
{
@ -47,109 +53,88 @@ namespace OpenTelemetry.Exporter.Zipkin.Implementation
var startTimestamp = ToEpochMicroseconds(otelSpan.StartTimestamp);
var endTimestamp = ToEpochMicroseconds(otelSpan.EndTimestamp);
var spanBuilder =
ZipkinSpan.NewBuilder()
.TraceId(EncodeTraceId(context.TraceId, useShortTraceIds))
.Id(EncodeSpanId(context.SpanId))
.Kind(ToSpanKind(otelSpan))
.Name(otelSpan.Name)
.Timestamp(ToEpochMicroseconds(otelSpan.StartTimestamp))
.Duration(endTimestamp - startTimestamp);
string parentId = null;
if (otelSpan.ParentSpanId != default)
{
spanBuilder.ParentId(EncodeSpanId(otelSpan.ParentSpanId));
parentId = EncodeSpanId(otelSpan.ParentSpanId);
}
Tuple<string, int> remoteEndpointServiceName = null;
foreach (var label in otelSpan.Attributes)
var attributeEnumerationState = new AttributeEnumerationState
{
string key = label.Key;
string strVal = label.Value.ToString();
Tags = PooledList<KeyValuePair<string, string>>.Create(),
};
if (strVal != null
&& RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (remoteEndpointServiceName == null || priority < remoteEndpointServiceName.Item2))
{
remoteEndpointServiceName = new Tuple<string, int>(strVal, priority);
}
DictionaryEnumerator<string, object, AttributeEnumerationState>.AllocationFreeForEach(otelSpan.Attributes, ref attributeEnumerationState, ProcessAttributesRef);
DictionaryEnumerator<string, object, AttributeEnumerationState>.AllocationFreeForEach(otelSpan.LibraryResource.Attributes, ref attributeEnumerationState, ProcessLibraryResourcesRef);
spanBuilder.PutTag(key, strVal);
}
var localEndpoint = defaultLocalEndpoint;
// See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-resource-semantic-conventions.md
string serviceName = string.Empty;
string serviceNamespace = string.Empty;
foreach (var label in otelSpan.LibraryResource.Attributes)
{
string key = label.Key;
object val = label.Value;
string strVal = val as string;
if (key == Resource.ServiceNameKey && strVal != null)
{
serviceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey && strVal != null)
{
serviceNamespace = strVal;
}
else
{
spanBuilder.PutTag(key, strVal ?? val?.ToString());
}
}
if (serviceNamespace != string.Empty)
{
serviceName = serviceNamespace + "." + serviceName;
}
var endpoint = defaultLocalEndpoint;
var serviceName = attributeEnumerationState.ServiceName;
// override default service name
if (serviceName != string.Empty)
if (!string.IsNullOrWhiteSpace(serviceName))
{
endpoint = LocalEndpointCache.GetOrAdd(serviceName, _ => new ZipkinEndpoint()
if (!string.IsNullOrWhiteSpace(attributeEnumerationState.ServiceNamespace))
{
Ipv4 = defaultLocalEndpoint.Ipv4,
Ipv6 = defaultLocalEndpoint.Ipv6,
Port = defaultLocalEndpoint.Port,
ServiceName = serviceName,
});
serviceName = attributeEnumerationState.ServiceNamespace + "." + serviceName;
}
if (!LocalEndpointCache.TryGetValue(serviceName, out localEndpoint))
{
localEndpoint = defaultLocalEndpoint.Clone(serviceName);
LocalEndpointCache.TryAdd(serviceName, localEndpoint);
}
}
spanBuilder.LocalEndpoint(endpoint);
if ((otelSpan.Kind == SpanKind.Client || otelSpan.Kind == SpanKind.Producer) && remoteEndpointServiceName != null)
ZipkinEndpoint remoteEndpoint = null;
if ((otelSpan.Kind == SpanKind.Client || otelSpan.Kind == SpanKind.Producer) && attributeEnumerationState.RemoteEndpointServiceName != null)
{
spanBuilder.RemoteEndpoint(RemoteEndpointCache.GetOrAdd(remoteEndpointServiceName.Item1, _ => new ZipkinEndpoint
{
ServiceName = remoteEndpointServiceName.Item1,
}));
remoteEndpoint = RemoteEndpointCache.GetOrAdd(attributeEnumerationState.RemoteEndpointServiceName, ZipkinEndpoint.Create);
}
var status = otelSpan.Status;
if (status.IsValid)
{
spanBuilder.PutTag(StatusCode, status.CanonicalCode.ToString());
if (!CanonicalCodeCache.TryGetValue(status.CanonicalCode, out string canonicalCode))
{
canonicalCode = status.CanonicalCode.ToString();
CanonicalCodeCache.TryAdd(status.CanonicalCode, canonicalCode);
}
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>(StatusCode, canonicalCode));
if (status.Description != null)
{
spanBuilder.PutTag(StatusDescription, status.Description);
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>(StatusDescription, status.Description));
}
}
foreach (var annotation in otelSpan.Events)
{
spanBuilder.AddAnnotation(ToEpochMicroseconds(annotation.Timestamp), annotation.Name);
}
var annotations = PooledList<ZipkinAnnotation>.Create();
ListEnumerator<Event, PooledList<ZipkinAnnotation>>.AllocationFreeForEach(otelSpan.Events, ref annotations, ProcessEventsRef);
return spanBuilder.Build();
return new ZipkinSpan(
EncodeTraceId(context.TraceId, useShortTraceIds),
parentId,
EncodeSpanId(context.SpanId),
ToSpanKind(otelSpan),
otelSpan.Name,
ToEpochMicroseconds(otelSpan.StartTimestamp),
duration: endTimestamp - startTimestamp,
localEndpoint,
remoteEndpoint,
annotations,
attributeEnumerationState.Tags,
null,
null);
}
private static long ToEpochMicroseconds(DateTimeOffset timestamp)
internal static string EncodeSpanId(ActivitySpanId spanId)
{
return spanId.ToHexString();
}
internal static long ToEpochMicroseconds(DateTimeOffset timestamp)
{
return timestamp.ToUnixTimeMilliseconds() * 1000;
}
@ -166,23 +151,83 @@ namespace OpenTelemetry.Exporter.Zipkin.Implementation
return id;
}
private static string EncodeSpanId(ActivitySpanId spanId)
private static string ToSpanKind(SpanData otelSpan)
{
return spanId.ToHexString();
switch (otelSpan.Kind)
{
case SpanKind.Server:
return "SERVER";
case SpanKind.Producer:
return "PRODUCER";
case SpanKind.Consumer:
return "CONSUMER";
default:
return "CLIENT";
}
}
private static ZipkinSpanKind ToSpanKind(SpanData otelSpan)
private static bool ProcessEvents(ref PooledList<ZipkinAnnotation> annotations, Event @event)
{
if (otelSpan.Kind == SpanKind.Server)
PooledList<ZipkinAnnotation>.Add(ref annotations, new ZipkinAnnotation(ToEpochMicroseconds(@event.Timestamp), @event.Name));
return true;
}
private static bool ProcessAttributes(ref AttributeEnumerationState state, KeyValuePair<string, object> attribute)
{
string key = attribute.Key;
if (!(attribute.Value is string strVal))
{
return ZipkinSpanKind.SERVER;
}
else if (otelSpan.Kind == SpanKind.Client)
{
return ZipkinSpanKind.CLIENT;
strVal = attribute.Value?.ToString();
}
return ZipkinSpanKind.CLIENT;
if (strVal != null
&& RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (state.RemoteEndpointServiceName == null || priority < state.RemoteEndpointServiceNamePriority))
{
state.RemoteEndpointServiceName = strVal;
state.RemoteEndpointServiceNamePriority = priority;
}
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal));
return true;
}
private static bool ProcessLibraryResources(ref AttributeEnumerationState state, KeyValuePair<string, object> label)
{
// See https://github.com/open-telemetry/opentelemetry-specification/blob/master/specification/data-resource-semantic-conventions.md
string key = label.Key;
object val = label.Value;
string strVal = val as string;
if (key == Resource.ServiceNameKey && strVal != null)
{
state.ServiceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey && strVal != null)
{
state.ServiceNamespace = strVal;
}
else
{
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal ?? val?.ToString()));
}
return true;
}
private struct AttributeEnumerationState
{
public PooledList<KeyValuePair<string, string>> Tags;
public string RemoteEndpointServiceName;
public int RemoteEndpointServiceNamePriority;
public string ServiceName;
public string ServiceNamespace;
}
}
}

View File

@ -13,16 +13,76 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Text.Json;
namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal class ZipkinEndpoint
{
public string ServiceName { get; set; }
public ZipkinEndpoint(string serviceName)
: this(serviceName, null, null, null)
{
}
public string Ipv4 { get; set; }
public ZipkinEndpoint(
string serviceName,
string ipv4,
string ipv6,
int? port)
{
this.ServiceName = serviceName;
this.Ipv4 = ipv4;
this.Ipv6 = ipv6;
this.Port = port;
}
public string Ipv6 { get; set; }
public string ServiceName { get; }
public int Port { get; set; }
public string Ipv4 { get; }
public string Ipv6 { get; }
public int? Port { get; }
public static ZipkinEndpoint Create(string serviceName)
{
return new ZipkinEndpoint(serviceName);
}
public ZipkinEndpoint Clone(string serviceName)
{
return new ZipkinEndpoint(
serviceName,
this.Ipv4,
this.Ipv6,
this.Port);
}
public void Write(Utf8JsonWriter writer)
{
writer.WriteStartObject();
if (this.ServiceName != null)
{
writer.WriteString("serviceName", this.ServiceName);
}
if (this.Ipv4 != null)
{
writer.WriteString("ipv4", this.Ipv4);
}
if (this.Ipv6 != null)
{
writer.WriteString("ipv6", this.Ipv6);
}
if (this.Port.HasValue)
{
writer.WriteNumber("port", this.Port.Value);
}
writer.WriteEndObject();
}
}
}

View File

@ -15,157 +15,170 @@
// </copyright>
using System;
using System.Collections.Generic;
using System.Text.Json.Serialization;
using System.Text.Json;
using OpenTelemetry.Internal;
namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal class ZipkinSpan
internal readonly struct ZipkinSpan
{
public string TraceId { get; set; }
public string ParentId { get; set; }
public string Id { get; set; }
[JsonConverter(typeof(JsonStringEnumConverter))]
public ZipkinSpanKind Kind { get; set; }
public string Name { get; set; }
public long Timestamp { get; set; }
public long Duration { get; set; }
public ZipkinEndpoint LocalEndpoint { get; set; }
public ZipkinEndpoint RemoteEndpoint { get; set; }
public IList<ZipkinAnnotation> Annotations { get; set; }
public Dictionary<string, string> Tags { get; set; }
public bool Debug { get; set; }
public bool Shared { get; set; }
public static Builder NewBuilder()
public ZipkinSpan(
string traceId,
string parentId,
string id,
string kind,
string name,
long? timestamp,
long? duration,
ZipkinEndpoint localEndpoint,
ZipkinEndpoint remoteEndpoint,
in PooledList<ZipkinAnnotation>? annotations,
in PooledList<KeyValuePair<string, string>>? tags,
bool? debug,
bool? shared)
{
return new Builder();
if (string.IsNullOrWhiteSpace(traceId))
{
throw new ArgumentNullException(nameof(traceId));
}
if (string.IsNullOrWhiteSpace(id))
{
throw new ArgumentNullException(nameof(id));
}
this.TraceId = traceId;
this.ParentId = parentId;
this.Id = id;
this.Kind = kind;
this.Name = name;
this.Timestamp = timestamp;
this.Duration = duration;
this.LocalEndpoint = localEndpoint;
this.RemoteEndpoint = remoteEndpoint;
this.Annotations = annotations;
this.Tags = tags;
this.Debug = debug;
this.Shared = shared;
}
public class Builder
public string TraceId { get; }
public string ParentId { get; }
public string Id { get; }
public string Kind { get; }
public string Name { get; }
public long? Timestamp { get; }
public long? Duration { get; }
public ZipkinEndpoint LocalEndpoint { get; }
public ZipkinEndpoint RemoteEndpoint { get; }
public PooledList<ZipkinAnnotation>? Annotations { get; }
public PooledList<KeyValuePair<string, string>>? Tags { get; }
public bool? Debug { get; }
public bool? Shared { get; }
public void Return()
{
private readonly ZipkinSpan result = new ZipkinSpan();
this.Annotations?.Return();
this.Tags?.Return();
}
internal Builder TraceId(string val)
public void Write(Utf8JsonWriter writer)
{
writer.WriteStartObject();
writer.WriteString("traceId", this.TraceId);
if (this.Name != null)
{
this.result.TraceId = val;
return this;
writer.WriteString("name", this.Name);
}
internal Builder Id(string val)
if (this.ParentId != null)
{
this.result.Id = val;
return this;
writer.WriteString("parentId", this.ParentId);
}
internal Builder ParentId(string val)
writer.WriteString("id", this.Id);
writer.WriteString("kind", this.Kind);
if (this.Timestamp.HasValue)
{
this.result.ParentId = val;
return this;
writer.WriteNumber("timestamp", this.Timestamp.Value);
}
internal Builder Kind(ZipkinSpanKind val)
if (this.Duration.HasValue)
{
this.result.Kind = val;
return this;
writer.WriteNumber("duration", this.Duration.Value);
}
internal Builder Name(string val)
if (this.Debug.HasValue)
{
this.result.Name = val;
return this;
writer.WriteBoolean("debug", this.Debug.Value);
}
internal Builder Timestamp(long val)
if (this.Shared.HasValue)
{
this.result.Timestamp = val;
return this;
writer.WriteBoolean("shared", this.Shared.Value);
}
internal Builder Duration(long val)
if (this.LocalEndpoint != null)
{
this.result.Duration = val;
return this;
writer.WritePropertyName("localEndpoint");
this.LocalEndpoint.Write(writer);
}
internal Builder LocalEndpoint(ZipkinEndpoint val)
if (this.RemoteEndpoint != null)
{
this.result.LocalEndpoint = val;
return this;
writer.WritePropertyName("remoteEndpoint");
this.RemoteEndpoint.Write(writer);
}
internal Builder RemoteEndpoint(ZipkinEndpoint val)
if (this.Annotations.HasValue)
{
this.result.RemoteEndpoint = val;
return this;
}
writer.WritePropertyName("annotations");
writer.WriteStartArray();
internal Builder Debug(bool val)
{
this.result.Debug = val;
return this;
}
internal Builder Shared(bool val)
{
this.result.Shared = val;
return this;
}
internal Builder PutTag(string key, string value)
{
if (this.result.Tags == null)
foreach (var annotation in this.Annotations.Value)
{
this.result.Tags = new Dictionary<string, string>();
writer.WriteStartObject();
writer.WriteNumber("timestamp", annotation.Timestamp);
writer.WriteString("value", annotation.Value);
writer.WriteEndObject();
}
if (key == null)
{
throw new ArgumentNullException(nameof(key));
}
this.result.Tags[key] = value ?? throw new ArgumentNullException(nameof(value));
return this;
writer.WriteEndArray();
}
internal Builder AddAnnotation(long timestamp, string value)
if (this.Tags.HasValue)
{
if (this.result.Annotations == null)
writer.WritePropertyName("tags");
writer.WriteStartObject();
foreach (var tag in this.Tags.Value)
{
this.result.Annotations = new List<ZipkinAnnotation>(2);
writer.WriteString(tag.Key, tag.Value);
}
this.result.Annotations.Add(new ZipkinAnnotation() { Timestamp = timestamp, Value = value });
return this;
writer.WriteEndObject();
}
internal ZipkinSpan Build()
{
if (this.result.TraceId == null)
{
throw new ArgumentException("Trace ID should not be null");
}
if (this.result.Id == null)
{
throw new ArgumentException("ID should not be null");
}
return this.result;
}
writer.WriteEndObject();
}
}
}

View File

@ -1,41 +0,0 @@
// <copyright file="ZipkinSpanKind.cs" company="OpenTelemetry Authors">
// Copyright 2018, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal enum ZipkinSpanKind
{
/// <summary>
/// Client span.
/// </summary>
CLIENT,
/// <summary>
/// Server span.
/// </summary>
SERVER,
/// <summary>
/// Producer.
/// </summary>
PRODUCER,
/// <summary>
/// Consumer.
/// </summary>
CONSUMER,
}
}

View File

@ -4,6 +4,10 @@
<Description>Zipkin exporter for OpenTelemetry</Description>
<PackageTags>$(PackageTags);Zipkin;distributed-tracing</PackageTags>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\OpenTelemetry\Internal\EnumerationHelper.cs" Link="Implementation\EnumerationHelper.cs" />
<Compile Include="..\OpenTelemetry\Internal\PooledList.cs" Link="Implementation\PooledList.cs" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\OpenTelemetry\OpenTelemetry.csproj" />

View File

@ -33,19 +33,8 @@ namespace OpenTelemetry.Exporter.Zipkin
/// </summary>
public class ZipkinTraceExporter : SpanExporter
{
private const long MillisPerSecond = 1000L;
private const long NanosPerMillisecond = 1000 * 1000;
private const long NanosPerSecond = NanosPerMillisecond * MillisPerSecond;
private static readonly JsonSerializerOptions Options = new JsonSerializerOptions
{
PropertyNamingPolicy = JsonNamingPolicy.CamelCase,
};
private readonly ZipkinTraceExporterOptions options;
private readonly ZipkinEndpoint localEndpoint;
private readonly HttpClient httpClient;
private readonly string serviceEndpoint;
/// <summary>
/// Initializes a new instance of the <see cref="ZipkinTraceExporter"/> class.
@ -55,48 +44,18 @@ namespace OpenTelemetry.Exporter.Zipkin
public ZipkinTraceExporter(ZipkinTraceExporterOptions options, HttpClient client = null)
{
this.options = options;
this.localEndpoint = this.GetLocalZipkinEndpoint();
this.LocalEndpoint = this.GetLocalZipkinEndpoint();
this.httpClient = client ?? new HttpClient();
this.serviceEndpoint = options.Endpoint?.ToString();
}
internal ZipkinEndpoint LocalEndpoint { get; }
/// <inheritdoc/>
public override async Task<ExportResult> ExportAsync(IEnumerable<SpanData> otelSpanList, CancellationToken cancellationToken)
public override async Task<ExportResult> ExportAsync(IEnumerable<SpanData> batch, CancellationToken cancellationToken)
{
var zipkinSpans = new List<ZipkinSpan>();
foreach (var data in otelSpanList)
{
bool shouldExport = true;
foreach (var label in data.Attributes)
{
if (label.Key == "http.url")
{
if (label.Value is string urlStr && urlStr == this.serviceEndpoint)
{
// do not track calls to Zipkin
shouldExport = false;
}
break;
}
}
if (shouldExport)
{
var zipkinSpan = data.ToZipkinSpan(this.localEndpoint, this.options.UseShortTraceIds);
zipkinSpans.Add(zipkinSpan);
}
}
if (zipkinSpans.Count == 0)
{
return ExportResult.Success;
}
try
{
await this.SendSpansAsync(zipkinSpans, cancellationToken);
await this.SendSpansAsync(batch).ConfigureAwait(false);
return ExportResult.Success;
}
catch (Exception)
@ -112,51 +71,37 @@ namespace OpenTelemetry.Exporter.Zipkin
return Task.CompletedTask;
}
private Task SendSpansAsync(IEnumerable<ZipkinSpan> spans, CancellationToken cancellationToken)
private Task SendSpansAsync(IEnumerable<SpanData> spans)
{
var requestUri = this.options.Endpoint;
var request = this.GetHttpRequestMessage(HttpMethod.Post, requestUri);
request.Content = this.GetRequestContent(spans);
var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
Content = new JsonContent(this, spans),
};
// avoid cancelling here: this is no return point: if we reached this point
// and cancellation is requested, it's better if we try to finish sending spans rather than drop it
return this.DoPostAsync(this.httpClient, request);
}
private async Task DoPostAsync(HttpClient client, HttpRequestMessage request)
{
await client.SendAsync(request).ConfigureAwait(false);
}
private HttpRequestMessage GetHttpRequestMessage(HttpMethod method, Uri requestUri)
{
var request = new HttpRequestMessage(method, requestUri);
return request;
}
private HttpContent GetRequestContent(IEnumerable<ZipkinSpan> toSerialize)
{
return new JsonContent(toSerialize, Options);
return this.httpClient.SendAsync(request);
}
private ZipkinEndpoint GetLocalZipkinEndpoint()
{
var result = new ZipkinEndpoint()
{
ServiceName = this.options.ServiceName,
};
var hostName = this.ResolveHostName();
string ipv4 = null;
string ipv6 = null;
if (!string.IsNullOrEmpty(hostName))
{
result.Ipv4 = this.ResolveHostAddress(hostName, AddressFamily.InterNetwork);
result.Ipv6 = this.ResolveHostAddress(hostName, AddressFamily.InterNetworkV6);
ipv4 = this.ResolveHostAddress(hostName, AddressFamily.InterNetwork);
ipv6 = this.ResolveHostAddress(hostName, AddressFamily.InterNetworkV6);
}
return result;
return new ZipkinEndpoint(
this.options.ServiceName,
ipv4,
ipv6,
null);
}
private string ResolveHostAddress(string hostName, AddressFamily family)
@ -222,19 +167,45 @@ namespace OpenTelemetry.Exporter.Zipkin
CharSet = "utf-8",
};
private readonly IEnumerable<ZipkinSpan> spans;
private readonly JsonSerializerOptions options;
private static Utf8JsonWriter writer;
public JsonContent(IEnumerable<ZipkinSpan> spans, JsonSerializerOptions options)
private readonly ZipkinTraceExporter exporter;
private readonly IEnumerable<SpanData> spans;
public JsonContent(ZipkinTraceExporter exporter, IEnumerable<SpanData> spans)
{
this.exporter = exporter;
this.spans = spans;
this.options = options;
this.Headers.ContentType = JsonHeader;
}
protected override async Task SerializeToStreamAsync(Stream stream, TransportContext context)
=> await JsonSerializer.SerializeAsync(stream, this.spans, this.options).ConfigureAwait(false);
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
if (writer == null)
{
writer = new Utf8JsonWriter(stream);
}
else
{
writer.Reset(stream);
}
writer.WriteStartArray();
foreach (var span in this.spans)
{
var zipkinSpan = span.ToZipkinSpan(this.exporter.LocalEndpoint, this.exporter.options.UseShortTraceIds);
zipkinSpan.Write(writer);
zipkinSpan.Return();
}
writer.WriteEndArray();
return writer.FlushAsync();
}
protected override bool TryComputeLength(out long length)
{

View File

@ -22,7 +22,7 @@ using System.Diagnostics;
using System.Reflection;
using System.Reflection.Emit;
namespace OpenTelemetry.Exporter.Jaeger.Implementation
namespace OpenTelemetry.Internal
{
internal class DictionaryEnumerator<TKey, TValue, TState> : Enumerator
<IEnumerable<KeyValuePair<TKey, TValue>>,

View File

@ -18,7 +18,7 @@ using System.Buffers;
using System.Collections;
using System.Collections.Generic;
namespace OpenTelemetry.Exporter.Jaeger.Implementation
namespace OpenTelemetry.Internal
{
internal readonly struct PooledList<T> : IEnumerable<T>, ICollection
{
@ -92,7 +92,12 @@ namespace OpenTelemetry.Exporter.Jaeger.Implementation
void ICollection.CopyTo(Array array, int index) => throw new NotSupportedException();
public IEnumerator<T> GetEnumerator()
public Enumerator GetEnumerator()
{
return new Enumerator(in this);
}
IEnumerator<T> IEnumerable<T>.GetEnumerator()
{
return new Enumerator(in this);
}
@ -102,7 +107,7 @@ namespace OpenTelemetry.Exporter.Jaeger.Implementation
return new Enumerator(in this);
}
private struct Enumerator : IEnumerator<T>, IEnumerator
public struct Enumerator : IEnumerator<T>, IEnumerator
{
private readonly T[] buffer;
private readonly int count;

View File

@ -25,6 +25,7 @@
<ItemGroup>
<PackageReference Include="System.Collections.Immutable" Version="1.4.0" />
<PackageReference Include="System.Reflection.Emit.Lightweight" Version="4.7.0" />
<PackageReference Condition="'$(TargetFramework)' == 'netstandard2.0'" Include="System.Runtime.CompilerServices.Unsafe" Version="4.7.0" />
</ItemGroup>

View File

@ -25,10 +25,7 @@ namespace OpenTelemetry.Exporter.Zipkin.Tests.Implementation
{
public class ZipkinTraceExporterRemoteEndpointTests
{
private static readonly ZipkinEndpoint DefaultZipkinEndpoint = new ZipkinEndpoint
{
ServiceName = "TestService",
};
private static readonly ZipkinEndpoint DefaultZipkinEndpoint = new ZipkinEndpoint("TestService");
[Fact]
public void ZipkinSpanConverterTest_GenerateSpan_RemoteEndpointOmittedByDefault()
@ -78,7 +75,7 @@ namespace OpenTelemetry.Exporter.Zipkin.Tests.Implementation
Assert.Equal("RemoteServiceName", zipkinSpan.RemoteEndpoint.ServiceName);
}
internal SpanData CreateTestSpan(
internal static SpanData CreateTestSpan(
bool setAttributes = true,
Dictionary<string, object> additionalAttributes = null,
bool addEvents = true,

View File

@ -5,6 +5,9 @@
<TargetFrameworks Condition="$(OS) == 'Windows_NT'">$(TargetFrameworks);net461</TargetFrameworks>
<IsPackable>false</IsPackable>
</PropertyGroup>
<ItemGroup>
<Compile Include="..\OpenTelemetry.Collector.Dependencies.Tests\TestServer.cs" Link="TestServer.cs" />
</ItemGroup>
<ItemGroup>
<Content Include="xunit.runner.json">

View File

@ -0,0 +1,101 @@
// <copyright file="ZipkinTraceExporterTests.cs" company="OpenTelemetry Authors">
// Copyright 2018, OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Collector.Dependencies.Tests;
using OpenTelemetry.Exporter.Zipkin.Implementation;
using OpenTelemetry.Exporter.Zipkin.Tests.Implementation;
using OpenTelemetry.Trace.Export;
using Xunit;
namespace OpenTelemetry.Exporter.Zipkin.Tests
{
public class ZipkinTraceExporterTests : IDisposable
{
private static readonly ConcurrentDictionary<Guid, string> Responses = new ConcurrentDictionary<Guid, string>();
private readonly IDisposable testServer;
private readonly string testServerHost;
private readonly int testServerPort;
public ZipkinTraceExporterTests()
{
this.testServer = TestServer.RunServer(
ctx => ProcessServerRequest(ctx),
out this.testServerHost,
out this.testServerPort);
static void ProcessServerRequest(HttpListenerContext context)
{
context.Response.StatusCode = 200;
using StreamReader readStream = new StreamReader(context.Request.InputStream);
string requestContent = readStream.ReadToEnd();
Responses.TryAdd(
Guid.Parse(context.Request.QueryString["requestId"]),
requestContent);
context.Response.OutputStream.Close();
}
}
public void Dispose()
{
this.testServer.Dispose();
}
[Fact]
public async Task ZipkinExporterIntegrationTest()
{
var spans = new List<SpanData> { ZipkinTraceExporterRemoteEndpointTests.CreateTestSpan() };
Guid requestId = Guid.NewGuid();
ZipkinTraceExporter exporter = new ZipkinTraceExporter(
new ZipkinTraceExporterOptions
{
Endpoint = new Uri($"http://{this.testServerHost}:{this.testServerPort}/api/v2/spans?requestId={requestId}"),
});
await exporter.ExportAsync(spans, CancellationToken.None).ConfigureAwait(false);
await exporter.ShutdownAsync(CancellationToken.None).ConfigureAwait(false);
var span = spans[0];
var context = span.Context;
var timestamp = ZipkinConversionExtensions.ToEpochMicroseconds(span.StartTimestamp);
StringBuilder ipInformation = new StringBuilder();
if (!string.IsNullOrEmpty(exporter.LocalEndpoint.Ipv4))
ipInformation.Append($@",""ipv4"":""{exporter.LocalEndpoint.Ipv4}""");
if (!string.IsNullOrEmpty(exporter.LocalEndpoint.Ipv6))
ipInformation.Append($@",""ipv6"":""{exporter.LocalEndpoint.Ipv6}""");
Assert.Equal(
$@"[{{""traceId"":""e8ea7e9ac72de94e91fabc613f9686b2"",""name"":""Name"",""parentId"":""{ZipkinConversionExtensions.EncodeSpanId(span.ParentSpanId)}"",""id"":""{ZipkinConversionExtensions.EncodeSpanId(context.SpanId)}"",""kind"":""CLIENT"",""timestamp"":{timestamp},""duration"":60000000,""localEndpoint"":{{""serviceName"":""Open Telemetry Exporter""{ipInformation}}},""annotations"":[{{""timestamp"":{timestamp},""value"":""Event1""}},{{""timestamp"":{timestamp},""value"":""Event2""}}],""tags"":{{""stringKey"":""value"",""longKey"":""1"",""longKey2"":""1"",""doubleKey"":""1"",""doubleKey2"":""1"",""boolKey"":""True"",""ot.status_code"":""Ok""}}}}]",
Responses[requestId]);
}
}
}