Use HttpClient to send traces on netcoreapp target

Restores #938
This commit is contained in:
Kevin Gosse 2020-10-08 13:01:47 +02:00 committed by GitHub
parent c6dce9caf3
commit 51c1e52e79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 221 additions and 5 deletions

View File

@ -32,7 +32,7 @@ namespace Datadog.Trace.Agent
_tracesEndpoint = new Uri(baseEndpoint, TracesPath);
_statsd = statsd;
_containerId = ContainerMetadata.GetContainerId();
_apiRequestFactory = apiRequestFactory ?? new ApiWebRequestFactory();
_apiRequestFactory = apiRequestFactory ?? CreateRequestFactory();
// report runtime details
try
@ -152,6 +152,15 @@ namespace Datadog.Trace.Agent
}
}
private static IApiRequestFactory CreateRequestFactory()
{
#if NETCOREAPP
return new HttpClientRequestFactory();
#else
return new ApiWebRequestFactory();
#endif
}
private static HashSet<ulong> GetUniqueTraceIds(Span[][] traces)
{
var uniqueTraceIds = new HashSet<ulong>();
@ -203,7 +212,7 @@ namespace Datadog.Trace.Agent
try
{
if (response.ContentLength > 0 && Tracer.Instance.Sampler != null)
if (response.ContentLength != 0 && Tracer.Instance.Sampler != null)
{
var responseContent = await response.ReadAsStringAsync().ConfigureAwait(false);

View File

@ -1,9 +1,6 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;
namespace Datadog.Trace.Agent

View File

@ -0,0 +1,39 @@
#if NETCOREAPP
using System;
using System.Net.Http;
using System.Threading.Tasks;
using Datadog.Trace.Agent.MessagePack;
namespace Datadog.Trace.Agent
{
internal class HttpClientRequest : IApiRequest
{
private readonly HttpClient _client;
private readonly HttpRequestMessage _request;
public HttpClientRequest(HttpClient client, Uri endpoint)
{
_client = client;
_request = new HttpRequestMessage(HttpMethod.Post, endpoint);
}
public void AddHeader(string name, string value)
{
_request.Headers.Add(name, value);
}
public async Task<IApiResponse> PostAsync(Span[][] traces, FormatterResolverWrapper formatterResolver)
{
// re-create HttpContent on every retry because some versions of HttpClient always dispose of it, so we can't reuse.
using (var content = new TracesMessagePackContent(traces, formatterResolver))
{
_request.Content = content;
var response = await _client.SendAsync(_request).ConfigureAwait(false);
return new HttpClientResponse(response);
}
}
}
}
#endif

View File

@ -0,0 +1,29 @@
#if NETCOREAPP
using System;
using System.Net.Http;
namespace Datadog.Trace.Agent
{
internal class HttpClientRequestFactory : IApiRequestFactory
{
private readonly HttpClient _client;
public HttpClientRequestFactory(HttpMessageHandler handler = null)
{
_client = handler == null ? new HttpClient() : new HttpClient(handler);
// Default headers
_client.DefaultRequestHeaders.Add(AgentHttpHeaderNames.Language, ".NET");
_client.DefaultRequestHeaders.Add(AgentHttpHeaderNames.TracerVersion, TracerConstants.AssemblyVersion);
// don't add automatic instrumentation to requests from this HttpClient
_client.DefaultRequestHeaders.Add(HttpHeaderNames.TracingEnabled, "false");
}
public IApiRequest Create(Uri endpoint)
{
return new HttpClientRequest(_client, endpoint);
}
}
}
#endif

View File

@ -0,0 +1,31 @@
#if NETCOREAPP
using System.Net.Http;
using System.Threading.Tasks;
namespace Datadog.Trace.Agent
{
internal class HttpClientResponse : IApiResponse
{
private readonly HttpResponseMessage _response;
public HttpClientResponse(HttpResponseMessage response)
{
_response = response;
}
public int StatusCode => (int)_response.StatusCode;
public long ContentLength => _response.Content.Headers.ContentLength ?? -1;
public void Dispose()
{
_response.Dispose();
}
public Task<string> ReadAsStringAsync()
{
return _response.Content.ReadAsStringAsync();
}
}
}
#endif

View File

@ -0,0 +1,47 @@
#if NETCOREAPP
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Threading.Tasks;
using Datadog.Trace.Vendors.MessagePack;
namespace Datadog.Trace.Agent.MessagePack
{
internal class TracesMessagePackContent : HttpContent
{
private readonly FormatterResolverWrapper _resolver;
/// <summary>
/// Initializes a new instance of the <see cref="TracesMessagePackContent"/> class.
/// </summary>
/// <param name="traces">The value to serialize into the content stream as MessagePack.</param>
/// <param name="resolver">The <see cref="IFormatterResolver"/> to use when serializing <paramref name="traces"/>.</param>
public TracesMessagePackContent(Span[][] traces, FormatterResolverWrapper resolver)
{
Traces = traces;
_resolver = resolver;
Headers.ContentType = new MediaTypeHeaderValue("application/msgpack");
}
public Span[][] Traces { get; }
/// <summary>Serialize the HTTP content to a stream as an asynchronous operation.</summary>
/// <param name="stream">The target stream.</param>
/// <param name="context">Information about the transport (channel binding token, for example). This parameter may be <see langword="null" />.</param>
/// <returns>The task object representing the asynchronous operation.</returns>
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
return CachedSerializer.Instance.SerializeAsync(stream, Traces, _resolver);
}
protected override bool TryComputeLength(out long length)
{
// We don't want compute the length beforehand
length = -1;
return false;
}
}
}
#endif

View File

@ -0,0 +1,64 @@
#if NETCOREAPP3_1
using System;
using System.Linq;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using Datadog.Trace.Agent;
using Datadog.Trace.Agent.MessagePack;
using Xunit;
namespace Datadog.Trace.Tests
{
public class HttpClientRequestTests
{
[Fact]
public async Task SetHeaders()
{
var handler = new CustomHandler();
var factory = new HttpClientRequestFactory(handler);
var request = factory.Create(new Uri("http://localhost/"));
request.AddHeader("Hello", "World");
await request.PostAsync(new Span[0][], new FormatterResolverWrapper(SpanFormatterResolver.Instance));
var message = handler.Message;
Assert.NotNull(message);
Assert.Equal(".NET", message.Headers.GetValues(AgentHttpHeaderNames.Language).First());
Assert.Equal(TracerConstants.AssemblyVersion, message.Headers.GetValues(AgentHttpHeaderNames.TracerVersion).First());
Assert.Equal("false", message.Headers.GetValues(HttpHeaderNames.TracingEnabled).First());
Assert.Equal("World", message.Headers.GetValues("Hello").First());
}
[Fact]
public async Task SerializeSpans()
{
var handler = new CustomHandler();
var factory = new HttpClientRequestFactory(handler);
var request = factory.Create(new Uri("http://localhost/"));
await request.PostAsync(new Span[0][], new FormatterResolverWrapper(SpanFormatterResolver.Instance));
var message = handler.Message;
Assert.IsAssignableFrom<TracesMessagePackContent>(message.Content);
}
private class CustomHandler : DelegatingHandler
{
public HttpRequestMessage Message { get; private set; }
protected override Task<HttpResponseMessage> SendAsync(HttpRequestMessage request, CancellationToken cancellationToken)
{
Message = request;
return Task.FromResult(new HttpResponseMessage());
}
}
}
}
#endif