From d119447e07d2e41a908e7e2dad1cfcc6818948b5 Mon Sep 17 00:00:00 2001 From: Mikel Blanchard Date: Fri, 15 Oct 2021 18:37:00 -0700 Subject: [PATCH] Write directly to output stream in OtlpHttpTraceExportClient. (#2490) --- .../ExportClient/OtlpHttpTraceExportClient.cs | 56 ++++++++-- ...marks.cs => OtlpGrpcExporterBenchmarks.cs} | 4 +- .../Exporter/OtlpHttpExporterBenchmarks.cs | 104 ++++++++++++++++++ .../OtlpHttpTraceExportClientTests.cs | 2 +- 4 files changed, 153 insertions(+), 13 deletions(-) rename test/Benchmarks/Exporter/{OtlpExporterBenchmarks.cs => OtlpGrpcExporterBenchmarks.cs} (95%) create mode 100644 test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs diff --git a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs index 9bb7e04fb..18f3957e9 100644 --- a/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs +++ b/src/OpenTelemetry.Exporter.OpenTelemetryProtocol/Implementation/ExportClient/OtlpHttpTraceExportClient.cs @@ -16,8 +16,14 @@ using System; using System.IO; +using System.Net; using System.Net.Http; using System.Net.Http.Headers; +using System.Runtime.CompilerServices; +#if NET5_0_OR_GREATER +using System.Threading; +#endif +using System.Threading.Tasks; using Google.Protobuf; using OtlpCollector = Opentelemetry.Proto.Collector.Trace.V1; @@ -43,18 +49,48 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClie request.Headers.Add(header.Key, header.Value); } - var content = Array.Empty(); - using (var stream = new MemoryStream()) - { - exportRequest.WriteTo(stream); - content = stream.ToArray(); - } - - var binaryContent = new ByteArrayContent(content); - binaryContent.Headers.ContentType = new MediaTypeHeaderValue(MediaContentType); - request.Content = binaryContent; + request.Content = new ExportRequestContent(exportRequest); return request; } + + internal sealed class ExportRequestContent : HttpContent + { + private static readonly MediaTypeHeaderValue ProtobufMediaTypeHeader = new MediaTypeHeaderValue(MediaContentType); + + private readonly OtlpCollector.ExportTraceServiceRequest exportRequest; + + public ExportRequestContent(OtlpCollector.ExportTraceServiceRequest exportRequest) + { + this.exportRequest = exportRequest; + this.Headers.ContentType = ProtobufMediaTypeHeader; + } + +#if NET5_0_OR_GREATER + protected override void SerializeToStream(Stream stream, TransportContext context, CancellationToken cancellationToken) + { + this.SerializeToStreamInternal(stream); + } +#endif + + protected override Task SerializeToStreamAsync(Stream stream, TransportContext context) + { + this.SerializeToStreamInternal(stream); + return Task.CompletedTask; + } + + protected override bool TryComputeLength(out long length) + { + // We can't know the length of the content being pushed to the output stream. + length = -1; + return false; + } + + [MethodImpl(MethodImplOptions.AggressiveInlining)] + private void SerializeToStreamInternal(Stream stream) + { + this.exportRequest.WriteTo(stream); + } + } } } diff --git a/test/Benchmarks/Exporter/OtlpExporterBenchmarks.cs b/test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs similarity index 95% rename from test/Benchmarks/Exporter/OtlpExporterBenchmarks.cs rename to test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs index e70459c58..2fb767843 100644 --- a/test/Benchmarks/Exporter/OtlpExporterBenchmarks.cs +++ b/test/Benchmarks/Exporter/OtlpGrpcExporterBenchmarks.cs @@ -1,4 +1,4 @@ -// +// // Copyright The OpenTelemetry Authors // // Licensed under the Apache License, Version 2.0 (the "License"); @@ -29,7 +29,7 @@ using OtlpCollector = Opentelemetry.Proto.Collector.Trace.V1; namespace Benchmarks.Exporter { [MemoryDiagnoser] - public class OtlpExporterBenchmarks + public class OtlpGrpcExporterBenchmarks { private OtlpTraceExporter exporter; private Activity activity; diff --git a/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs b/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs new file mode 100644 index 000000000..de4477f5b --- /dev/null +++ b/test/Benchmarks/Exporter/OtlpHttpExporterBenchmarks.cs @@ -0,0 +1,104 @@ +// +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +using System; +using System.Diagnostics; +using System.IO; +using BenchmarkDotNet.Attributes; +using Benchmarks.Helper; +using OpenTelemetry; +using OpenTelemetry.Exporter; +using OpenTelemetry.Exporter.OpenTelemetryProtocol.Implementation.ExportClient; +using OpenTelemetry.Internal; +using OpenTelemetry.Tests; + +namespace Benchmarks.Exporter +{ + [MemoryDiagnoser] + public class OtlpHttpExporterBenchmarks + { + private readonly byte[] buffer = new byte[1024 * 1024]; + private IDisposable server; + private string serverHost; + private int serverPort; + private OtlpTraceExporter exporter; + private Activity activity; + private CircularBuffer activityBatch; + + [Params(1, 10, 100)] + public int NumberOfBatches { get; set; } + + [Params(10000)] + public int NumberOfSpans { get; set; } + + [GlobalSetup] + public void GlobalSetup() + { + this.server = TestHttpServer.RunServer( + (ctx) => + { + using (Stream receiveStream = ctx.Request.InputStream) + { + while (true) + { + if (receiveStream.Read(this.buffer, 0, this.buffer.Length) == 0) + { + break; + } + } + } + + ctx.Response.StatusCode = 200; + ctx.Response.OutputStream.Close(); + }, + out this.serverHost, + out this.serverPort); + + var options = new OtlpExporterOptions + { + Endpoint = new Uri($"http://{this.serverHost}:{this.serverPort}"), + }; + this.exporter = new OtlpTraceExporter( + options, + new OtlpHttpTraceExportClient(options)); + + this.activity = ActivityHelper.CreateTestActivity(); + this.activityBatch = new CircularBuffer(this.NumberOfSpans); + } + + [GlobalCleanup] + public void GlobalCleanup() + { + this.exporter.Shutdown(); + this.exporter.Dispose(); + this.server.Dispose(); + } + + [Benchmark] + public void OtlpExporter_Batching() + { + for (int i = 0; i < this.NumberOfBatches; i++) + { + for (int c = 0; c < this.NumberOfSpans; c++) + { + this.activityBatch.Add(this.activity); + } + + this.exporter.Export(new Batch(this.activityBatch, this.NumberOfSpans)); + } + } + } +} diff --git a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs index 8809074b3..604bc9eee 100644 --- a/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs +++ b/test/OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests/Implementation/ExportClient/OtlpHttpTraceExportClientTests.cs @@ -181,7 +181,7 @@ namespace OpenTelemetry.Exporter.OpenTelemetryProtocol.Tests.Implementation.Expo Assert.Contains(httpRequest.Headers, h => h.Key == header2.Name && h.Value.First() == header2.Value); Assert.NotNull(httpRequest.Content); - Assert.IsType(httpRequest.Content); + Assert.IsType(httpRequest.Content); Assert.Contains(httpRequest.Content.Headers, h => h.Key == "Content-Type" && h.Value.First() == OtlpHttpTraceExportClient.MediaContentType); var exportTraceRequest = OtlpCollector.ExportTraceServiceRequest.Parser.ParseFrom(httpRequestContent);