Add ActivityExporter for Zipkin (#738)

* Adding Zipkin activity exporter

* Added activitysource to tags

* Revert ZipkinTraceExporterOptions name change

* Refactored ProcessTags

* Removing sample files

* Modified UseZipkinActivityExporter to use AddProcessorPipeline

* Refactored based on Reiley's comments

* Added UseShortTraceIds to test

* Added useShortTraceIds to test

Co-authored-by: Cijo Thomas <cithomas@microsoft.com>
This commit is contained in:
Rajkumar Rangaraj 2020-06-19 16:41:16 -07:00 committed by GitHub
parent ce09b59405
commit bbf68f6d31
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 864 additions and 1 deletions

View File

@ -0,0 +1,229 @@
// <copyright file="ZipkinActivityConversionExtensions.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using OpenTelemetry.Internal;
using OpenTelemetry.Resources;
using OpenTelemetry.Trace;
namespace OpenTelemetry.Exporter.Zipkin.Implementation
{
internal static class ZipkinActivityConversionExtensions
{
private const long TicksPerMicrosecond = TimeSpan.TicksPerMillisecond / 1000;
private static readonly Dictionary<string, int> RemoteEndpointServiceNameKeyResolutionDictionary = new Dictionary<string, int>(StringComparer.OrdinalIgnoreCase)
{
[SpanAttributeConstants.PeerServiceKey] = 0, // RemoteEndpoint.ServiceName primary.
["net.peer.name"] = 1, // RemoteEndpoint.ServiceName first alternative.
["peer.hostname"] = 2, // RemoteEndpoint.ServiceName second alternative.
["peer.address"] = 2, // RemoteEndpoint.ServiceName second alternative.
["http.host"] = 3, // RemoteEndpoint.ServiceName for Http.
["db.instance"] = 4, // RemoteEndpoint.ServiceName for Redis.
};
private static readonly string InvalidSpanId = default(ActivitySpanId).ToHexString();
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> LocalEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly ConcurrentDictionary<string, ZipkinEndpoint> RemoteEndpointCache = new ConcurrentDictionary<string, ZipkinEndpoint>();
private static readonly DictionaryEnumerator<string, string, AttributeEnumerationState>.ForEachDelegate ProcessTagsRef = ProcessTags;
private static readonly ListEnumerator<ActivityEvent, PooledList<ZipkinAnnotation>>.ForEachDelegate ProcessActivityEventsRef = ProcessActivityEvents;
internal static ZipkinSpan ToZipkinSpan(this Activity activity, ZipkinEndpoint defaultLocalEndpoint, bool useShortTraceIds = false)
{
var context = activity.Context;
var startTimestamp = activity.StartTimeUtc.ToEpochMicroseconds();
string parentId = EncodeSpanId(activity.ParentSpanId);
if (string.Equals(parentId, InvalidSpanId, StringComparison.Ordinal))
{
parentId = null;
}
var attributeEnumerationState = new AttributeEnumerationState
{
Tags = PooledList<KeyValuePair<string, string>>.Create(),
};
DictionaryEnumerator<string, string, AttributeEnumerationState>.AllocationFreeForEach(activity.Tags, ref attributeEnumerationState, ProcessTagsRef);
var activitySource = activity.Source;
if (!string.IsNullOrEmpty(activitySource.Name))
{
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>("library.name", activitySource.Name));
if (!string.IsNullOrEmpty(activitySource.Version))
{
PooledList<KeyValuePair<string, string>>.Add(ref attributeEnumerationState.Tags, new KeyValuePair<string, string>("library.version", activitySource.Version));
}
}
var localEndpoint = defaultLocalEndpoint;
var serviceName = attributeEnumerationState.ServiceName;
// override default service name
if (!string.IsNullOrWhiteSpace(serviceName))
{
if (!string.IsNullOrWhiteSpace(attributeEnumerationState.ServiceNamespace))
{
serviceName = attributeEnumerationState.ServiceNamespace + "." + serviceName;
}
if (!LocalEndpointCache.TryGetValue(serviceName, out localEndpoint))
{
localEndpoint = defaultLocalEndpoint.Clone(serviceName);
LocalEndpointCache.TryAdd(serviceName, localEndpoint);
}
}
ZipkinEndpoint remoteEndpoint = null;
if ((activity.Kind == ActivityKind.Client || activity.Kind == ActivityKind.Producer) && attributeEnumerationState.RemoteEndpointServiceName != null)
{
remoteEndpoint = RemoteEndpointCache.GetOrAdd(attributeEnumerationState.RemoteEndpointServiceName, ZipkinEndpoint.Create);
}
var annotations = PooledList<ZipkinAnnotation>.Create();
ListEnumerator<ActivityEvent, PooledList<ZipkinAnnotation>>.AllocationFreeForEach(activity.Events, ref annotations, ProcessActivityEventsRef);
return new ZipkinSpan(
EncodeTraceId(context.TraceId, useShortTraceIds),
parentId,
EncodeSpanId(context.SpanId),
ToActivityKind(activity),
activity.OperationName,
activity.StartTimeUtc.ToEpochMicroseconds(),
duration: (long)activity.Duration.ToEpochMicroseconds(),
localEndpoint,
remoteEndpoint,
annotations,
attributeEnumerationState.Tags,
null,
null);
}
internal static string EncodeSpanId(ActivitySpanId spanId)
{
return spanId.ToHexString();
}
internal static long ToEpochMicroseconds(this DateTimeOffset dateTimeOffset)
{
return dateTimeOffset.Ticks / TicksPerMicrosecond;
}
internal static long ToEpochMicroseconds(this TimeSpan timeSpan)
{
return timeSpan.Ticks / TicksPerMicrosecond;
}
internal static long ToEpochMicroseconds(this DateTime utcDateTime)
{
const long UnixEpochTicks = 621355968000000000L; // = DateTimeOffset.FromUnixTimeMilliseconds(0).Ticks
const long UnixEpochMicroseconds = UnixEpochTicks / TicksPerMicrosecond;
// Truncate sub-microsecond precision before offsetting by the Unix Epoch to avoid
// the last digit being off by one for dates that result in negative Unix times
long microseconds = utcDateTime.Ticks / TicksPerMicrosecond;
return microseconds - UnixEpochMicroseconds;
}
private static string EncodeTraceId(ActivityTraceId traceId, bool useShortTraceIds)
{
var id = traceId.ToHexString();
if (id.Length > 16 && useShortTraceIds)
{
id = id.Substring(id.Length - 16, 16);
}
return id;
}
private static string ToActivityKind(Activity activity)
{
switch (activity.Kind)
{
case ActivityKind.Server:
return "SERVER";
case ActivityKind.Producer:
return "PRODUCER";
case ActivityKind.Consumer:
return "CONSUMER";
case ActivityKind.Client:
return "CLIENT";
}
return null;
}
private static bool ProcessActivityEvents(ref PooledList<ZipkinAnnotation> annotations, ActivityEvent @event)
{
PooledList<ZipkinAnnotation>.Add(ref annotations, new ZipkinAnnotation(@event.Timestamp.ToEpochMicroseconds(), @event.Name));
return true;
}
private static bool ProcessTags(ref AttributeEnumerationState state, KeyValuePair<string, string> attribute)
{
string key = attribute.Key;
string strVal = attribute.Value;
if (strVal != null)
{
if (RemoteEndpointServiceNameKeyResolutionDictionary.TryGetValue(key, out int priority)
&& (state.RemoteEndpointServiceName == null || priority < state.RemoteEndpointServiceNamePriority))
{
state.RemoteEndpointServiceName = strVal;
state.RemoteEndpointServiceNamePriority = priority;
}
else if (key == Resource.ServiceNameKey)
{
state.ServiceName = strVal;
}
else if (key == Resource.ServiceNamespaceKey)
{
state.ServiceNamespace = strVal;
}
else
{
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal));
}
}
else
{
PooledList<KeyValuePair<string, string>>.Add(ref state.Tags, new KeyValuePair<string, string>(key, strVal));
}
return true;
}
private struct AttributeEnumerationState
{
public PooledList<KeyValuePair<string, string>> Tags;
public string RemoteEndpointServiceName;
public int RemoteEndpointServiceNamePriority;
public string ServiceName;
public string ServiceNamespace;
}
}
}

View File

@ -112,7 +112,10 @@ namespace OpenTelemetry.Exporter.Zipkin.Implementation
writer.WriteString("id", this.Id);
writer.WriteString("kind", this.Kind);
if (this.Kind != null)
{
writer.WriteString("kind", this.Kind);
}
if (this.Timestamp.HasValue)
{

View File

@ -83,5 +83,33 @@ namespace OpenTelemetry.Trace.Configuration
processorConfigure.Invoke(b);
});
}
/// <summary>
/// Registers a Zipkin exporter that will receive <see cref="System.Diagnostics.Activity"/> instances.
/// </summary>
/// <param name="builder"><see cref="OpenTelemetryBuilder"/> builder to use.</param>
/// <param name="configure">Exporter configuration options.</param>
/// <returns>The instance of <see cref="OpenTelemetryBuilder"/> to chain the calls.</returns>
public static OpenTelemetryBuilder UseZipkinActivityExporter(this OpenTelemetryBuilder builder, Action<ZipkinTraceExporterOptions> configure)
{
if (builder == null)
{
throw new ArgumentNullException(nameof(builder));
}
if (configure == null)
{
throw new ArgumentNullException(nameof(configure));
}
return builder.AddProcessorPipeline(pipeline =>
{
var options = new ZipkinTraceExporterOptions();
configure(options);
var activityExporter = new ZipkinActivityExporter(options);
pipeline.SetExporter(activityExporter);
});
}
}
}

View File

@ -0,0 +1,220 @@
// <copyright file="ZipkinActivityExporter.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Net.Sockets;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Exporter.Zipkin.Implementation;
using OpenTelemetry.Trace.Export;
namespace OpenTelemetry.Exporter.Zipkin
{
/// <summary>
/// Zipkin exporter.
/// </summary>
public class ZipkinActivityExporter : ActivityExporter
{
private readonly ZipkinTraceExporterOptions options;
private readonly HttpClient httpClient;
/// <summary>
/// Initializes a new instance of the <see cref="ZipkinActivityExporter"/> class.
/// </summary>
/// <param name="options">Configuration options.</param>
/// <param name="client">Http client to use to upload telemetry.</param>
public ZipkinActivityExporter(ZipkinTraceExporterOptions options, HttpClient client = null)
{
this.options = options;
this.LocalEndpoint = this.GetLocalZipkinEndpoint();
this.httpClient = client ?? new HttpClient();
}
internal ZipkinEndpoint LocalEndpoint { get; }
/// <inheritdoc/>
public override async Task<ExportResult> ExportAsync(IEnumerable<Activity> batchActivity, CancellationToken cancellationToken)
{
try
{
await this.SendBatchActivityAsync(batchActivity).ConfigureAwait(false);
return ExportResult.Success;
}
catch (Exception)
{
// TODO distinguish retryable exceptions
return ExportResult.FailedNotRetryable;
}
}
/// <inheritdoc/>
public override Task ShutdownAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
private Task SendBatchActivityAsync(IEnumerable<Activity> batchActivity)
{
var requestUri = this.options.Endpoint;
var request = new HttpRequestMessage(HttpMethod.Post, requestUri)
{
Content = new JsonContent(this, batchActivity),
};
// avoid cancelling here: this is no return point: if we reached this point
// and cancellation is requested, it's better if we try to finish sending spans rather than drop it
return this.httpClient.SendAsync(request);
}
private ZipkinEndpoint GetLocalZipkinEndpoint()
{
var hostName = this.ResolveHostName();
string ipv4 = null;
string ipv6 = null;
if (!string.IsNullOrEmpty(hostName))
{
ipv4 = this.ResolveHostAddress(hostName, AddressFamily.InterNetwork);
ipv6 = this.ResolveHostAddress(hostName, AddressFamily.InterNetworkV6);
}
return new ZipkinEndpoint(
this.options.ServiceName,
ipv4,
ipv6,
null);
}
private string ResolveHostAddress(string hostName, AddressFamily family)
{
string result = null;
try
{
var results = Dns.GetHostAddresses(hostName);
if (results != null && results.Length > 0)
{
foreach (var addr in results)
{
if (addr.AddressFamily.Equals(family))
{
var sanitizedAddress = new IPAddress(addr.GetAddressBytes()); // Construct address sans ScopeID
result = sanitizedAddress.ToString();
break;
}
}
}
}
catch (Exception)
{
// Ignore
}
return result;
}
private string ResolveHostName()
{
string result = null;
try
{
result = Dns.GetHostName();
if (!string.IsNullOrEmpty(result))
{
var response = Dns.GetHostEntry(result);
if (response != null)
{
return response.HostName;
}
}
}
catch (Exception)
{
// Ignore
}
return result;
}
private class JsonContent : HttpContent
{
private static readonly MediaTypeHeaderValue JsonHeader = new MediaTypeHeaderValue("application/json")
{
CharSet = "utf-8",
};
private readonly ZipkinActivityExporter exporter;
private readonly IEnumerable<Activity> batchActivity;
private Utf8JsonWriter writer;
public JsonContent(ZipkinActivityExporter exporter, IEnumerable<Activity> batchActivity)
{
this.exporter = exporter;
this.batchActivity = batchActivity;
this.Headers.ContentType = JsonHeader;
}
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
if (this.writer == null)
{
this.writer = new Utf8JsonWriter(stream);
}
else
{
this.writer.Reset(stream);
}
this.writer.WriteStartArray();
foreach (var activity in this.batchActivity)
{
var zipkinSpan = activity.ToZipkinSpan(this.exporter.LocalEndpoint, this.exporter.options.UseShortTraceIds);
zipkinSpan.Write(this.writer);
zipkinSpan.Return();
}
this.writer.WriteEndArray();
return this.writer.FlushAsync();
}
protected override bool TryComputeLength(out long length)
{
// We can't know the length of the content being pushed to the output stream.
length = -1;
return false;
}
}
}
}

View File

@ -0,0 +1,89 @@
// <copyright file="ZipkinActivityConversionTest.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Diagnostics;
using System.Linq;
using OpenTelemetry.Exporter.Zipkin.Implementation;
using Xunit;
namespace OpenTelemetry.Exporter.Zipkin.Tests.Implementation
{
public class ZipkinActivityConversionTest
{
private const string ZipkinSpanName = "Name";
private static readonly ZipkinEndpoint DefaultZipkinEndpoint = new ZipkinEndpoint("TestService");
[Fact]
public void ZipkinActivityConversion_ToZipkinSpan_AllPropertiesSet()
{
// Arrange
var activity = ZipkinActivityExporterTests.CreateTestActivity();
// Act & Assert
var zipkinSpan = activity.ToZipkinSpan(DefaultZipkinEndpoint);
Assert.Equal(ZipkinSpanName, zipkinSpan.Name);
Assert.Equal(activity.TraceId.ToHexString(), zipkinSpan.TraceId);
Assert.Equal(activity.SpanId.ToHexString(), zipkinSpan.Id);
Assert.Equal(activity.StartTimeUtc.ToEpochMicroseconds(), zipkinSpan.Timestamp);
Assert.Equal((long)(activity.Duration.TotalMilliseconds * 1000), zipkinSpan.Duration);
int counter = 0;
var tagsArray = zipkinSpan.Tags.Value.ToArray();
foreach (var tags in activity.Tags)
{
Assert.Equal(tagsArray[counter].Key, tags.Key);
Assert.Equal(tagsArray[counter++].Value, tags.Value);
}
foreach (var annotation in zipkinSpan.Annotations)
{
// Timestamp is same in both events
Assert.Equal(activity.Events.First().Timestamp.ToEpochMicroseconds(), annotation.Timestamp);
}
}
[Fact]
public void ZipkinActivityConversion_ToZipkinSpan_NoEvents()
{
// Arrange
var activity = ZipkinActivityExporterTests.CreateTestActivity(addEvents: false);
// Act & Assert
var zipkinSpan = activity.ToZipkinSpan(DefaultZipkinEndpoint);
Assert.Equal(ZipkinSpanName, zipkinSpan.Name);
Assert.Empty(zipkinSpan.Annotations.Value);
Assert.Equal(activity.TraceId.ToHexString(), zipkinSpan.TraceId);
Assert.Equal(activity.SpanId.ToHexString(), zipkinSpan.Id);
int counter = 0;
var tagsArray = zipkinSpan.Tags.Value.ToArray();
foreach (var tags in activity.Tags)
{
Assert.Equal(tagsArray[counter].Key, tags.Key);
Assert.Equal(tagsArray[counter++].Value, tags.Value);
}
Assert.Equal(activity.StartTimeUtc.ToEpochMicroseconds(), zipkinSpan.Timestamp);
Assert.Equal((long)activity.Duration.TotalMilliseconds * 1000, zipkinSpan.Duration);
}
}
}

View File

@ -0,0 +1,75 @@
// <copyright file="ZipkinActivityExporterRemoteEndpointTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System.Collections.Generic;
using OpenTelemetry.Exporter.Zipkin.Implementation;
using Xunit;
namespace OpenTelemetry.Exporter.Zipkin.Tests.Implementation
{
public class ZipkinActivityExporterRemoteEndpointTests
{
private static readonly ZipkinEndpoint DefaultZipkinEndpoint = new ZipkinEndpoint("TestService");
[Fact]
public void ZipkinSpanConverterTest_GenerateActivity_RemoteEndpointOmittedByDefault()
{
// Arrange
var activity = ZipkinActivityExporterTests.CreateTestActivity();
// Act & Assert
var zipkinSpan = ZipkinActivityConversionExtensions.ToZipkinSpan(activity, DefaultZipkinEndpoint);
Assert.Null(zipkinSpan.RemoteEndpoint);
}
[Fact]
public void ZipkinSpanConverterTest_GenerateActivity_RemoteEndpointResolution()
{
// Arrange
var activity = ZipkinActivityExporterTests.CreateTestActivity(
additionalAttributes: new Dictionary<string, object>
{
["net.peer.name"] = "RemoteServiceName",
});
// Act & Assert
var zipkinSpan = ZipkinActivityConversionExtensions.ToZipkinSpan(activity, DefaultZipkinEndpoint);
Assert.NotNull(zipkinSpan.RemoteEndpoint);
Assert.Equal("RemoteServiceName", zipkinSpan.RemoteEndpoint.ServiceName);
}
[Fact]
public void ZipkinSpanConverterTest_GenerateActivity_RemoteEndpointResolutionPriority()
{
// Arrange
var activity = ZipkinActivityExporterTests.CreateTestActivity(
additionalAttributes: new Dictionary<string, object>
{
["http.host"] = "DiscardedRemoteServiceName",
["net.peer.name"] = "RemoteServiceName",
["peer.hostname"] = "DiscardedRemoteServiceName",
});
// Act & Assert
var zipkinSpan = ZipkinActivityConversionExtensions.ToZipkinSpan(activity, DefaultZipkinEndpoint);
Assert.NotNull(zipkinSpan.RemoteEndpoint);
Assert.Equal("RemoteServiceName", zipkinSpan.RemoteEndpoint.ServiceName);
}
}
}

View File

@ -0,0 +1,219 @@
// <copyright file="ZipkinActivityExporterTests.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using OpenTelemetry.Exporter.Zipkin.Implementation;
using OpenTelemetry.Internal.Test;
using OpenTelemetry.Resources;
using Xunit;
namespace OpenTelemetry.Exporter.Zipkin.Tests
{
public class ZipkinActivityExporterTests : IDisposable
{
private const string TraceId = "e8ea7e9ac72de94e91fabc613f9686b2";
private static readonly ConcurrentDictionary<Guid, string> Responses = new ConcurrentDictionary<Guid, string>();
private readonly IDisposable testServer;
private readonly string testServerHost;
private readonly int testServerPort;
static ZipkinActivityExporterTests()
{
Activity.DefaultIdFormat = ActivityIdFormat.W3C;
Activity.ForceDefaultIdFormat = true;
var listener = new ActivityListener
{
ShouldListenTo = _ => true,
GetRequestedDataUsingParentId = (ref ActivityCreationOptions<string> options) => ActivityDataRequest.AllData,
GetRequestedDataUsingContext = (ref ActivityCreationOptions<ActivityContext> options) => ActivityDataRequest.AllData,
};
ActivitySource.AddActivityListener(listener);
}
public ZipkinActivityExporterTests()
{
this.testServer = TestHttpServer.RunServer(
ctx => ProcessServerRequest(ctx),
out this.testServerHost,
out this.testServerPort);
static void ProcessServerRequest(HttpListenerContext context)
{
context.Response.StatusCode = 200;
using StreamReader readStream = new StreamReader(context.Request.InputStream);
string requestContent = readStream.ReadToEnd();
Responses.TryAdd(
Guid.Parse(context.Request.QueryString["requestId"]),
requestContent);
context.Response.OutputStream.Close();
}
}
public void Dispose()
{
this.testServer.Dispose();
}
[Theory]
[InlineData(true)]
[InlineData(false)]
public async Task ZipkinActivityExporterIntegrationTest(bool useShortTraceIds)
{
var batchActivity = new List<Activity> { CreateTestActivity() };
Guid requestId = Guid.NewGuid();
ZipkinActivityExporter exporter = new ZipkinActivityExporter(
new ZipkinTraceExporterOptions
{
Endpoint = new Uri($"http://{this.testServerHost}:{this.testServerPort}/api/v2/spans?requestId={requestId}"),
UseShortTraceIds = useShortTraceIds,
});
await exporter.ExportAsync(batchActivity, CancellationToken.None).ConfigureAwait(false);
await exporter.ShutdownAsync(CancellationToken.None).ConfigureAwait(false);
var activity = batchActivity[0];
var context = activity.Context;
var timestamp = activity.StartTimeUtc.ToEpochMicroseconds();
var eventTimestamp = activity.Events.First().Timestamp.ToEpochMicroseconds();
StringBuilder ipInformation = new StringBuilder();
if (!string.IsNullOrEmpty(exporter.LocalEndpoint.Ipv4))
{
ipInformation.Append($@",""ipv4"":""{exporter.LocalEndpoint.Ipv4}""");
}
if (!string.IsNullOrEmpty(exporter.LocalEndpoint.Ipv6))
{
ipInformation.Append($@",""ipv6"":""{exporter.LocalEndpoint.Ipv6}""");
}
var traceId = useShortTraceIds ? TraceId.Substring(TraceId.Length - 16, 16) : TraceId;
Assert.Equal(
$@"[{{""traceId"":""{traceId}"",""name"":""Name"",""parentId"":""{ZipkinConversionExtensions.EncodeSpanId(activity.ParentSpanId)}"",""id"":""{ZipkinActivityConversionExtensions.EncodeSpanId(context.SpanId)}"",""kind"":""CLIENT"",""timestamp"":{timestamp},""duration"":60000000,""localEndpoint"":{{""serviceName"":""Open Telemetry Exporter""{ipInformation}}},""annotations"":[{{""timestamp"":{eventTimestamp},""value"":""Event1""}},{{""timestamp"":{eventTimestamp},""value"":""Event2""}}],""tags"":{{""stringKey"":""value"",""longKey"":""1"",""longKey2"":""1"",""doubleKey"":""1"",""doubleKey2"":""1"",""boolKey"":""True"",""library.name"":""CreateTestActivity""}}}}]",
Responses[requestId]);
}
internal static Activity CreateTestActivity(
bool setAttributes = true,
Dictionary<string, object> additionalAttributes = null,
bool addEvents = true,
bool addLinks = true,
Resource resource = null,
ActivityKind kind = ActivityKind.Client)
{
var startTimestamp = DateTime.UtcNow;
var endTimestamp = startTimestamp.AddSeconds(60);
var eventTimestamp = DateTime.UtcNow;
var traceId = ActivityTraceId.CreateFromString("e8ea7e9ac72de94e91fabc613f9686b2".AsSpan());
var parentSpanId = ActivitySpanId.CreateFromBytes(new byte[] { 12, 23, 34, 45, 56, 67, 78, 89 });
var attributes = new Dictionary<string, object>
{
{ "stringKey", "value" },
{ "longKey", 1L },
{ "longKey2", 1 },
{ "doubleKey", 1D },
{ "doubleKey2", 1F },
{ "boolKey", true },
};
if (additionalAttributes != null)
{
foreach (var attribute in additionalAttributes)
{
attributes.Add(attribute.Key, attribute.Value);
}
}
var events = new List<ActivityEvent>
{
new ActivityEvent(
"Event1",
eventTimestamp,
new Dictionary<string, object>
{
{ "key", "value" },
}),
new ActivityEvent(
"Event2",
eventTimestamp,
new Dictionary<string, object>
{
{ "key", "value" },
}),
};
var linkedSpanId = ActivitySpanId.CreateFromString("888915b6286b9c41".AsSpan());
var activitySource = new ActivitySource(nameof(CreateTestActivity));
var tags = setAttributes ?
attributes.Select(kvp => new KeyValuePair<string, string>(kvp.Key, kvp.Value.ToString()))
: null;
var links = addLinks ?
new[]
{
new ActivityLink(new ActivityContext(
traceId,
linkedSpanId,
ActivityTraceFlags.Recorded)),
}
: null;
var activity = activitySource.StartActivity(
"Name",
kind,
parentContext: new ActivityContext(traceId, parentSpanId, ActivityTraceFlags.Recorded),
tags,
links,
startTime: startTimestamp);
if (addEvents)
{
foreach (var evnt in events)
{
activity.AddEvent(evnt);
}
}
activity.SetEndTime(endTimestamp);
activity.Stop();
return activity;
}
}
}