Write directly to output stream in OtlpHttpTraceExportClient. (#2490)

This commit is contained in:
Mikel Blanchard 2021-10-15 18:37:00 -07:00 committed by GitHub
parent f8f201863a
commit d119447e07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 153 additions and 13 deletions

View File

@ -16,8 +16,14 @@
using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Net.Http.Headers;
using System.Runtime.CompilerServices;
#if NET5_0_OR_GREATER
using System.Threading;
#endif
using System.Threading.Tasks;
using Google.Protobuf;
using OtlpCollector = Opentelemetry.Proto.Collector.Trace.V1;
@ -43,18 +49,48 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie
request.Headers.Add(header.Key, header.Value);
}
var content = Array.Empty<byte>();
using (var stream = new MemoryStream())
{
exportRequest.WriteTo(stream);
content = stream.ToArray();
}
var binaryContent = new ByteArrayContent(content);
binaryContent.Headers.ContentType = new MediaTypeHeaderValue(MediaContentType);
request.Content = binaryContent;
request.Content = new ExportRequestContent(exportRequest);
return request;
}
internal sealed class ExportRequestContent : HttpContent
{
private static readonly MediaTypeHeaderValue ProtobufMediaTypeHeader = new MediaTypeHeaderValue(MediaContentType);
private readonly OtlpCollector.ExportTraceServiceRequest exportRequest;
public ExportRequestContent(OtlpCollector.ExportTraceServiceRequest exportRequest)
{
this.exportRequest = exportRequest;
this.Headers.ContentType = ProtobufMediaTypeHeader;
}
#if NET5_0_OR_GREATER
protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken cancellationToken)
{
this.SerializeToStreamInternal(stream);
}
#endif
protected override Task SerializeToStreamAsync(Stream stream, TransportContext context)
{
this.SerializeToStreamInternal(stream);
return Task.CompletedTask;
}
protected override bool TryComputeLength(out long length)
{
// We can't know the length of the content being pushed to the output stream.
length = -1;
return false;
}
[MethodImpl(MethodImplOptions.AggressiveInlining)]
private void SerializeToStreamInternal(Stream stream)
{
this.exportRequest.WriteTo(stream);
}
}
}
}

View File

@ -1,4 +1,4 @@
// <copyright file="OtlpExporterBenchmarks.cs" company="OpenTelemetry Authors">
// <copyright file="OtlpGrpcExporterBenchmarks.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
@ -29,7 +29,7 @@ using OtlpCollector = Opentelemetry.Proto.Collector.Trace.V1;
namespace Benchmarks.Exporter
{
[MemoryDiagnoser]
public class OtlpExporterBenchmarks
public class OtlpGrpcExporterBenchmarks
{
private OtlpTraceExporter exporter;
private Activity activity;

View File

@ -0,0 +1,104 @@
// <copyright file="OtlpHttpExporterBenchmarks.cs" company="OpenTelemetry Authors">
// Copyright The OpenTelemetry Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// </copyright>
using System;
using System.Diagnostics;
using System.IO;
using BenchmarkDotNet.Attributes;
using Benchmarks.Helper;
using OpenTelemetry;
using OpenTelemetry.Exporter;
using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient;
using OpenTelemetry.Internal;
using OpenTelemetry.Tests;
namespace Benchmarks.Exporter
{
[MemoryDiagnoser]
public class OtlpHttpExporterBenchmarks
{
private readonly byte[] buffer = new byte[1024 * 1024];
private IDisposable server;
private string serverHost;
private int serverPort;
private OtlpTraceExporter exporter;
private Activity activity;
private CircularBuffer<Activity> activityBatch;
[Params(1, 10, 100)]
public int NumberOfBatches { get; set; }
[Params(10000)]
public int NumberOfSpans { get; set; }
[GlobalSetup]
public void GlobalSetup()
{
this.server = TestHttpServer.RunServer(
(ctx) =>
{
using (Stream receiveStream = ctx.Request.InputStream)
{
while (true)
{
if (receiveStream.Read(this.buffer, 0, this.buffer.Length) == 0)
{
break;
}
}
}
ctx.Response.StatusCode = 200;
ctx.Response.OutputStream.Close();
},
out this.serverHost,
out this.serverPort);
var options = new OtlpExporterOptions
{
Endpoint = new Uri($"http://{this.serverHost}:{this.serverPort}"),
};
this.exporter = new OtlpTraceExporter(
options,
new OtlpHttpTraceExportClient(options));
this.activity = ActivityHelper.CreateTestActivity();
this.activityBatch = new CircularBuffer<Activity>(this.NumberOfSpans);
}
[GlobalCleanup]
public void GlobalCleanup()
{
this.exporter.Shutdown();
this.exporter.Dispose();
this.server.Dispose();
}
[Benchmark]
public void OtlpExporter_Batching()
{
for (int i = 0; i < this.NumberOfBatches; i++)
{
for (int c = 0; c < this.NumberOfSpans; c++)
{
this.activityBatch.Add(this.activity);
}
this.exporter.Export(new Batch<Activity>(this.activityBatch, this.NumberOfSpans));
}
}
}
}

View File

@ -181,7 +181,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests.Implementation.Expo
Assert.Contains(httpRequest.Headers, h => h.Key == header2.Name && h.Value.First() == header2.Value);
Assert.NotNull(httpRequest.Content);
Assert.IsType<ByteArrayContent>(httpRequest.Content);
Assert.IsType<OtlpHttpTraceExportClient.ExportRequestContent>(httpRequest.Content);
Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == OtlpHttpTraceExportClient.MediaContentType);
var exportTraceRequest = OtlpCollector.ExportTraceServiceRequest.Parser.ParseFrom(httpRequestContent);